系统架构总览
基于 Anthropic Claude API 构建的多角色智能体协作系统,三个专业角色由主控调度器统一管理
用户输入
自然语言描述任务
↓
主控调度器(Orchestrator)
claude-sonnet-4-6 · 任务拆解 · 角色分发 · 结果整合
💻
程序员智能体
代码生成 · 技术方案
接口设计 · Bug修复
接口设计 · Bug修复
code_exec
file_write
web_search
🎨
设计师智能体
UI/UX方案 · 视觉规范
原型设计 · 交互说明
原型设计 · 交互说明
image_gen
color_tools
figma_api
📊
运营智能体
内容策划 · 数据分析
增长策略 · 竞品调研
增长策略 · 竞品调研
web_search
analytics
spreadsheet
共享上下文 / 记忆层
项目背景 · 历史决策 · 任务状态 · 跨角色数据同步
Redis
Vector DB
↓
汇总输出
代码 + 设计方案 + 运营策略 + 执行报告
角色配置详情
每个角色的系统提示词、能力边界与工具调用配置
主控调度器
Orchestrator · claude-sonnet-4-6
核心角色
核心职责
任务意图理解子任务拆解
依赖关系分析并行调度
结果整合质量评估
错误恢复
System Prompt
你是一家互联网公司的 AI 项目经理(Orchestrator)。
你管理三名专家智能体:
- engineer:资深全栈工程师,负责技术实现
- designer:UI/UX 设计师,负责视觉与交互
- operator:运营专家,负责推广与数据分析
# 工作流程
1. 接收用户任务,分析目标与约束
2. 将任务拆解为结构化子任务列表
3. 评估子任务依赖,决定串行/并行
4. 分发给对应角色并收集结果
5. 整合所有输出,生成最终交付物
# 输出格式
每次拆解任务时,必须返回 JSON 格式:
{"tasks": [{"id": "t1", "agent": "engineer",
"instruction": "...", "depends_on": []}]}
程序员智能体
Engineer Agent · claude-sonnet-4-6
能力边界
需求分析架构设计
代码生成API设计
数据库设计性能优化
Bug分析代码审查
技术文档测试方案
可调用工具
code_execution
file_write
web_search
bash_exec
git_tools
System Prompt
你是一名资深全栈工程师(Engineer Agent)。
# 技术栈偏好
后端:Python/FastAPI, Node.js
前端:React, TypeScript
数据库:PostgreSQL, Redis
部署:Docker, K8s
# 输出要求
- 所有代码需包含注释和类型标注
- 提供接口文档(OpenAPI格式)
- 指出潜在的安全风险
- 给出性能估算(QPS/延迟)
# 协作规范
读取共享记忆中设计师的 UI 规范
与运营协商数据采集埋点方案
输出格式
✦ 代码文件
✦ 架构图说明
✦ API 文档
✦ 部署指南
设计师智能体
Designer Agent · claude-sonnet-4-6
能力边界
用户体验分析信息架构
视觉规范制定配色方案
组件设计响应式布局
原型交互说明品牌设计
无障碍设计动效规范
可调用工具
image_generation
color_palette
figma_api
icon_search
web_search
System Prompt
你是一名资深 UI/UX 设计师(Designer Agent)。
# 设计原则
- 以用户为中心,数据驱动决策
- 遵循 Material Design 3 或 Apple HIG
- 注重一致性与可扩展的设计系统
- 无障碍标准:WCAG 2.1 AA
# 输出要求
- 提供完整的 Design Token(色值/字体/间距)
- 说明关键页面的交互流程
- 给出组件状态(默认/悬停/禁用/错误)
- 输出 Tailwind CSS 兼容的设计规范
# 协作规范
将配色方案写入共享记忆供程序员使用
输出格式
✦ Design Token
✦ 页面线框图说明
✦ 组件规范
✦ 交互说明文档
运营智能体
Operator Agent · claude-sonnet-4-6
能力边界
用户增长策略内容策划
竞品分析数据指标定义
活动方案渠道策略
用户分层留存分析
AB测试方案推广文案
可调用工具
web_search
data_analysis
spreadsheet
chart_gen
competitor_scan
System Prompt
你是一名资深互联网运营专家(Operator Agent)。
# 分析框架
增长:AARRR 漏斗(获取/激活/留存/变现/传播)
内容:PESO 模型(付费/赚取/分享/自有)
数据:定义 北极星指标 + 辅助指标体系
# 输出要求
- 给出量化的增长目标(DAU/MAU/转化率)
- 制定上线后 30 天的运营节奏
- 列出关键数据埋点需求(对接程序员)
- 提供 3 个不同预算的推广方案
# 协作规范
将埋点需求写入共享记忆供程序员实现
输出格式
✦ 增长策略报告
✦ 数据指标体系
✦ 30天运营计划
✦ 埋点需求文档
完整代码实现
基于 Anthropic Python SDK 的完整多智能体系统实现
1. 角色配置 & 系统提示词
定义每个智能体的 System Prompt、工具权限和输出规范
agent_config.py
Python
# agent_config.py — 智能体角色配置
from dataclasses import dataclass, field
from typing import List, Dict, Any
@dataclass
class AgentConfig:
name: str
role: str
system_prompt: str
tools: List[str]
model: str = "claude-sonnet-4-6"
temperature: float = 0.7
max_tokens: int = 4096
# ─── 主控调度器 ────────────────────────────────────
ORCHESTRATOR_CONFIG = AgentConfig(
name="orchestrator",
role="项目经理",
system_prompt="""
你是一家互联网公司的 AI 项目经理(Orchestrator)。
你管理三名专家智能体:
- engineer: 资深全栈工程师,负责技术实现
- designer: UI/UX 设计师,负责视觉与交互
- operator: 运营专家,负责推广与数据分析
## 工作流程
1. 接收用户任务,深度理解目标与约束条件
2. 将大任务拆解为独立子任务,分配给合适角色
3. 分析子任务依赖关系,决定串行还是并行执行
4. 收集所有角色输出,整合成最终交付物
5. 对结果进行质量评估,必要时要求返工
## 任务拆解输出格式(必须严格遵守)
返回 JSON 格式:
{
"project_summary": "项目简介",
"tasks": [
{
"id": "t1",
"agent": "engineer|designer|operator",
"priority": 1,
"instruction": "具体指令",
"expected_output": "预期产出",
"depends_on": [],
"parallel_ok": true
}
]
}
""",
tools=[],
temperature=0.3
)
# ─── 程序员智能体 ─────────────────────────────────
ENGINEER_CONFIG = AgentConfig(
name="engineer",
role="全栈工程师",
system_prompt="""
你是一名资深全栈工程师(Engineer Agent)。
## 技术能力
后端:Python/FastAPI, Node.js/Express, Go
前端:React, TypeScript, Next.js
数据库:PostgreSQL, Redis, MongoDB
基础设施:Docker, K8s, AWS/阿里云
安全:OAuth2, JWT, SQL注入防护, XSS防护
## 工作规范
- 代码必须包含完整类型注解和 docstring
- 提供 RESTful API 文档(OpenAPI 3.0 格式)
- 指出潜在的安全漏洞和性能瓶颈
- 给出关键接口的 QPS 承载能力估算
- 生产代码需包含错误处理和日志记录
## 与其他角色协作
- 读取共享记忆中设计师输出的 Design Token
- 根据运营提出的数据埋点需求设计 Analytics API
- 将技术选型决策写入共享记忆供其他角色参考
## 输出物
1. 核心功能代码(带注释)
2. API 接口文档
3. 数据库 Schema
4. 部署 Dockerfile + docker-compose
5. 技术风险说明
""",
tools=["code_execution", "file_write", "web_search"]
)
# ─── 设计师智能体 ─────────────────────────────────
DESIGNER_CONFIG = AgentConfig(
name="designer",
role="UI/UX设计师",
system_prompt="""
你是一名资深 UI/UX 设计师(Designer Agent)。
## 设计能力
- 用户研究:用户访谈、竞品分析、用户画像
- 信息架构:页面结构、导航设计、内容组织
- 视觉设计:Design System、配色、排版、图标
- 交互设计:流程设计、状态机、动效规范
- 可用性:无障碍设计(WCAG 2.1 AA)
## 设计规范
- 遵循 Material Design 3 或 Apple HIG
- 优先考虑移动端优先策略
- 提供 Light / Dark 两套主题
- 使用 8px 基础网格系统
## 输出 Design Token 格式(标准)
{
"colors": {
"primary": "#...", "primary-hover": "#...",
"background": "#...", "surface": "#...",
"text-primary": "#...", "text-secondary": "#..."
},
"typography": {
"font-family": "...",
"heading-xl": {"size": "32px", "weight": 700, "line-height": 1.2},
"body": {"size": "16px", "weight": 400, "line-height": 1.6}
},
"spacing": {"xs": "4px", "sm": "8px", "md": "16px", "lg": "24px"},
"border-radius": {"sm": "6px", "md": "10px", "lg": "16px"}
}
## 输出物
1. Design Token JSON
2. 关键页面线框图说明(文字描述 + 布局结构)
3. 组件状态规范(至少 4 种状态)
4. 交互流程图(Mermaid 格式)
5. 响应式断点策略
""",
tools=["image_generation", "web_search", "color_tools"]
)
# ─── 运营智能体 ────────────────────────────────────
OPERATOR_CONFIG = AgentConfig(
name="operator",
role="运营专家",
system_prompt="""
你是一名资深互联网运营专家(Operator Agent)。
## 运营能力
- 用户增长:获客渠道、转化漏斗、病毒传播系数
- 内容运营:内容策略、SEO、社交媒体矩阵
- 数据运营:指标体系、数据看板、AB 测试设计
- 活动运营:促销活动、裂变方案、用户激励体系
- 商业化:定价策略、付费转化、LTV 优化
## 分析框架
- 增长:AARRR 漏斗(获取/激活/留存/变现/传播)
- 内容:PESO 模型(付费/赚取/分享/自有媒体)
- 竞品:4C 分析(用户/成本/便利/沟通)
## 数据指标体系
北极星指标 + 3层辅助指标:
- L1:业务结果(收入/活跃用户)
- L2:关键行为(功能使用率/转化率)
- L3:过程指标(浏览深度/停留时长)
## 输出物
1. 北极星指标定义 + 指标树
2. 上线后 30 天运营节奏表
3. 竞品分析报告(至少3个对标产品)
4. 用户增长策略(保守/激进/最优 三个方案)
5. 数据埋点需求文档(供程序员实现)
6. 推广文案示例(3种渠道 × 3种文案)
""",
tools=["web_search", "data_analysis", "spreadsheet"]
)
AGENT_CONFIGS: Dict[str, AgentConfig] = {
"orchestrator": ORCHESTRATOR_CONFIG,
"engineer": ENGINEER_CONFIG,
"designer": DESIGNER_CONFIG,
"operator": OPERATOR_CONFIG,
}
2. 共享记忆层
跨智能体的上下文共享、状态管理与持久化存储
memory.py
Python
# memory.py — 共享记忆层(线程安全)
import json
from datetime import datetime
from threading import Lock
from typing import Any, Dict, List, Optional
class SharedMemory:
"""跨智能体共享上下文,线程安全"""
def __init__(self):
self._lock = Lock()
self._store: Dict[str, Any] = {
"project": {}, # 项目元信息
"decisions": {}, # 跨角色决策记录
"task_results": {}, # 各任务执行结果
"task_status": {}, # 任务执行状态
"design_tokens": {}, # 设计规范(设计师写入)
"tech_stack": {}, # 技术选型(工程师写入)
"analytics_events": [], # 埋点需求(运营写入)
"timeline": [], # 操作时间线
}
def set(self, key: str, value: Any, agent: str = "system"):
"""写入记忆,自动记录来源和时间"""
with self._lock:
self._store[key] = value
self._store["timeline"].append({
"agent": agent, "key": key,
"action": "write",
"timestamp": datetime.now().isoformat()
})
def get(self, key: str, default: Any = None) -> Any:
with self._lock:
return self._store.get(key, default)
def update_task_status(self, task_id: str, status: str):
# status: "pending" | "running" | "done" | "failed"
with self._lock:
self._store["task_status"][task_id] = {
"status": status,
"updated_at": datetime.now().isoformat()
}
def save_result(self, task_id: str, result: str, agent: str):
with self._lock:
self._store["task_results"][task_id] = {
"agent": agent, "result": result,
"created_at": datetime.now().isoformat()
}
def get_context_for_agent(self, agent_name: str) -> str:
"""生成特定角色的上下文摘要,注入 System Prompt"""
context = {
"project": self._store.get("project"),
"decisions": self._store.get("decisions"),
}
if agent_name == "engineer":
context["design_tokens"] = self._store.get("design_tokens")
context["analytics_events"] = self._store.get("analytics_events")
elif agent_name == "designer":
context["tech_stack"] = self._store.get("tech_stack")
elif agent_name == "operator":
context["tech_stack"] = self._store.get("tech_stack")
return json.dumps(context, ensure_ascii=False, indent=2)
def snapshot(self) -> Dict:
with self._lock:
return dict(self._store)
3. 主控调度器
任务拆解、并行调度与结果整合的核心逻辑
orchestrator.py
Python
# orchestrator.py — 主控调度器
import asyncio, json
import anthropic
from typing import Dict, List
from agent_config import AGENT_CONFIGS
from memory import SharedMemory
client = anthropic.Anthropic()
class SubAgent:
"""单个子智能体的执行单元"""
def __init__(self, config, memory: SharedMemory):
self.config = config
self.memory = memory
async def execute(self, task_id: str, instruction: str) -> str:
self.memory.update_task_status(task_id, "running")
# 将共享上下文注入消息
ctx = self.memory.get_context_for_agent(self.config.name)
full_instruction = f"""
## 当前项目上下文
{ctx}
## 你的任务
{instruction}
请完成上述任务,并将关键决策写入输出末尾的 [MEMORY_UPDATE] 标记区域。
"""
response = await asyncio.to_thread(
client.messages.create,
model=self.config.model,
max_tokens=self.config.max_tokens,
system=self.config.system_prompt,
messages=[{"role": "user", "content": full_instruction}]
)
result = response.content[0].text
# 解析并保存记忆更新
self._extract_and_save_memory(result, task_id)
self.memory.update_task_status(task_id, "done")
return result
def _extract_and_save_memory(self, result: str, task_id: str):
self.memory.save_result(task_id, result, self.config.name)
# 根据角色自动提取关键信息
if self.config.name == "designer" and "design_tokens" in result.lower():
self.memory.set("design_tokens", result, "designer")
elif self.config.name == "engineer":
self.memory.set("tech_stack", result[:500], "engineer")
elif self.config.name == "operator" and "埋点" in result:
self.memory.set("analytics_events", result[:1000], "operator")
class Orchestrator:
"""主控调度器,负责任务拆解与并行调度"""
def __init__(self):
self.memory = SharedMemory()
self.agents = {
name: SubAgent(cfg, self.memory)
for name, cfg in AGENT_CONFIGS.items()
if name != "orchestrator"
}
self.orc_config = AGENT_CONFIGS["orchestrator"]
async def run(self, user_task: str, callback=None) -> str:
# Step 1: 调度器拆解任务
if callback: callback("orchestrator", "正在分析任务并拆解子任务...")
plan = await self._plan(user_task)
tasks = plan.get("tasks", [])
self.memory.set("project", {"summary": plan.get("project_summary"), "user_task": user_task})
# Step 2: 按优先级和依赖并行执行
results = {}
priority_groups = self._group_by_priority(tasks)
for group in priority_groups:
# 同优先级且无相互依赖的任务并行执行
parallel_tasks = [
t for t in group
if all(dep in results for dep in t.get("depends_on", []))
]
coroutines = []
for task in parallel_tasks:
agent = self.agents.get(task["agent"])
if callback: callback(task["agent"], f"开始执行: {task['instruction'][:50]}...")
coroutines.append(agent.execute(task["id"], task["instruction"]))
group_results = await asyncio.gather(*coroutines)
for task, result in zip(parallel_tasks, group_results):
results[task["id"]] = result
# Step 3: 汇总输出
if callback: callback("orchestrator", "所有任务完成,正在整合输出...")
return await self._aggregate(user_task, results)
async def _plan(self, task: str) -> Dict:
resp = await asyncio.to_thread(
client.messages.create,
model=self.orc_config.model,
max_tokens=2048,
system=self.orc_config.system_prompt,
messages=[{"role": "user", "content": f"请拆解以下任务:{task}"}]
)
text = resp.content[0].text
# 提取 JSON
start = text.find("{")
end = text.rfind("}") + 1
return json.loads(text[start:end]) if start >= 0 else {"tasks": []}
def _group_by_priority(self, tasks: List) -> List[List]:
groups = {}
for t in tasks:
p = t.get("priority", 1)
groups.setdefault(p, []).append(t)
return [groups[k] for k in sorted(groups.keys())]
async def _aggregate(self, task: str, results: Dict) -> str:
summary = "\n\n".join(
f"### {tid}\n{content[:800]}"
for tid, content in results.items()
)
resp = await asyncio.to_thread(
client.messages.create,
model=self.orc_config.model, max_tokens=3000,
system=self.orc_config.system_prompt,
messages=[{"role": "user", "content": f"""
原始任务:{task}
各角色输出:{summary}
请整合以上内容,生成结构清晰的最终交付物报告。"""}]
)
return resp.content[0].text
# ─── 使用示例 ─────────────────────────────────────
async def main():
orc = Orchestrator()
def on_update(agent: str, msg: str):
print(f"[{agent.upper()}] {msg}")
result = await orc.run(
"开发一个 SaaS 会员订阅系统,支持月付/年付,需要完整的前后端",
callback=on_update
)
print("\n=== 最终输出 ===")
print(result)
if __name__ == "__main__":
asyncio.run(main())
▶ 任务输入
预设:
◈ 协作消息流
输入任务并点击启动,观察三个智能体如何协作
⚡ 智能体状态
Orchestrator
待命
Engineer
待命
Designer
待命
Operator
待命
总体进度0%
🗄️ 共享记忆层
// 等待任务启动...