AWS 内部的 Sales, Marketing and Global Services(SMGS)组织每天要处理海量业务数据——销售管线、营销 ROI、全球服务工单,数据散落在数十个系统里。传统 BI 仪表盘能看数,但没法"对话"。SMGS 团队用 Amazon Bedrock AgentCore 搭了 NarrateAI,让业务人员直接用自然语言提问,背后是一套两层架构 + 多智能体协作的生产级方案。这篇文章拆解它的核心设计,并给出可落地的代码模板。
为什么需要两层架构
NarrateAI 最关键的设计决策:把批量数据处理和实时交互拆成两层。
业务数据有几个现实约束:
- 数据量大、更新频率低(比如每日汇总的销售报表),适合离线预计算;
- 用户提问是实时的,期望秒级响应,不能等一轮 ETL 跑完;
- 预计算结果需要校验,防止"AI 信心满满地给了错误数字"。
两层架构的分工:
| 层 | 职责 | 触发方式 | 响应时间 |
|---|---|---|---|
| Batch Layer | 数据摄取、清洗、聚合、指标预计算 | 定时 / 事件驱动 | 分钟~小时级 |
| Real-time Layer | 意图识别、Agent 路由、查询编排、结果校验、对话生成 | 用户请求 | 秒级 |
Batch Layer 把结果写入向量索引和结构化存储;Real-time Layer 从这些存储里检索,再由多个专用 Agent 协作完成推理和校验。这样用户问"上季度亚太区新客户增长多少",Real-time Layer 不需要重新跑聚合,直接命中预计算指标,再由校验 Agent 确认数字来源可靠。
多 Agent 协作:路由与校验
NarrateAI 不是"一个万能 Agent 包打天下",而是拆成多个专用 Agent,由一个 Router Agent 做分发:
- Router Agent——解析用户意图,判断该走哪个下游 Agent。比如"帮我对比 Q3 和 Q4 的 pipeline"走 Comparison Agent,"列出 top 10 客户"走 Ranking Agent。
- Comparison Agent——处理时间区间对比、跨维度对比类查询,从预计算指标中取数并生成对比表格。
- Ranking Agent——处理排序、Top-N 类查询。
- Validation Agent——独立校验其他 Agent 返回的数据:检查指标是否来自可信数据源、数值是否在合理区间、是否与已知基准一致。
Validation Agent 是生产部署中最容易被忽略但最关键的一环。它不是简单打一个置信度分数,而是做三件事:
- 溯源:每个数字必须能追溯到具体数据源表和时间戳;
- 边界检查:增长率超过 500%?大概率是数据异常,不是业务奇迹;
- 交叉验证:与已发布的官方报表做数值比对,偏差超过阈值就标记警告。
这种"生成—校验"双 Agent 模式,比单 Agent 自我检查可靠得多——校验 Agent 的 prompt 和工具集与生成 Agent 完全隔离,避免同源偏差。
生产级工程模式
NarrateAI 在生产环境踩过的坑,提炼出几个可复用的模式:
模式一:Agent 输出的结构化约束
不让 Agent 自由输出自然语言数字,而是要求返回 JSON schema 约定的结构。前端再根据结构渲染表格或图表。这样做的好处:校验 Agent 可以对 JSON 字段做类型和范围检查,而不是解析一段模糊的文字。
模式二:数据源注册与权限绑定
每个数据源在 AgentCore 的 Action Group 里注册时,同时绑定 IAM 权限策略。Agent 只能访问它被授权的数据源。Comparison Agent 不应该能直接读原始工单表——它只需要预计算的汇总指标。
模式三:对话上下文的滑动窗口
长对话中,Agent 的上下文窗口会溢出。NarrateAI 的做法:只保留最近 5 轮对话的完整内容,更早的对话压缩成一句话摘要塞进 system prompt。这样既保持上下文连贯,又控制 token 成本。
模式四:灰度发布与 A/B 回退
新 Agent 版本上线时,Router Agent 先把 10% 的流量路由到新版本,对比校验 Agent 的警告率。警告率飙升就自动回退到旧版本。
实践:用 Bedrock AgentCore 搭一个简化版 BI 助手
下面给出一个最小可运行的示例,展示两层架构的核心骨架。你需要一个 AWS 账号,且已在 us-east-1 区域开通 Bedrock AgentCore 访问权限。
第一步:创建 Agent 和 Action Group
import boto3
import time
import json
client = boto3.client("bedrock-agent", region_name="us-east-1")
# 创建 Router Agent
router_agent = client.create_agent(
agentName="bi-router-agent",
description="Route business queries to specialized agents",
agentResourceRoleArn="arn:aws:iam::YOUR_ACCOUNT_ID:role/BedrockAgentRuntimeRole",
idleSessionTTLInSeconds=600,
foundationModel="anthropic.claude-3-5-sonnet-20241022-v2:0",
instruction=(
"You are a BI query router. Analyze the user's question and decide which action to take:\n"
"- comparison_query: for time-period or cross-dimension comparisons\n"
"- ranking_query: for top-N or sorting requests\n"
"- general_query: for everything else\n"
"Always respond with a JSON object: {\"action\": \"<action_name>\", \"params\": {...}}"
),
)
router_agent_id = router_agent["agent"]["agentId"]
print(f"Router Agent ID: {router_agent_id}")
# 创建 Action Group —— 对应 Batch Layer 预计算的指标查询
action_group = client.create_agent_action_group(
agentId=router_agent_id,
agentVersion="DRAFT",
actionGroupName="QueryPrecomputedMetrics",
description="Query precomputed business metrics from the batch layer",
actionGroupExecutor={
"lambda": "arn:aws:lambda:us-east-1:YOUR_ACCOUNT_ID:function:query-metrics"
},
functionSchema={
"functions": [
{
"name": "comparison_query",
"description": "Compare metrics between two time periods",
"parameters": {
"metric": {"type": "string", "description": "e.g. new_customer_count, pipeline_value"},
"period_a": {"type": "string", "description": "e.g. 2024-Q3"},
"period_b": {"type": "string", "description": "e.g. 2024-Q4"},
"region": {"type": "string", "description": "e.g. APAC, GLOBAL", "required": False},
},
},
{
"name": "ranking_query",
"description": "Get top-N ranking for a metric",
"parameters": {
"metric": {"type": "string", "description": "e.g. revenue, deal_count"},
"top_n": {"type": "integer", "description": "number of results, default 10"},
"period": {"type": "string", "description": "e.g. 2024-Q4"},
},
},
]
},
)
print(f"Action Group ID: {action_group['agentActionGroup']['actionGroupId']}")
运行前替换
YOUR_ACCOUNT_ID和 Lambda ARN。Lambda 函数的实现见下一步。
第二步:Batch Layer 的 Lambda 函数(简化版)
这个 Lambda 模拟从预计算存储中查指标。生产环境中,它会读 DynamoDB 或 Redshift 的汇总表。
# Lambda function: query-metrics
# 文件名: lambda_function.py
import json
# 模拟 Batch Layer 预计算结果
PRECOMPUTED = {
"new_customer_count": {
"2024-Q3": {"GLOBAL": 320, "APAC": 85, "NA": 150, "EU": 85},
"2024-Q4": {"GLOBAL": 410, "APAC": 120, "NA": 180, "EU": 110},
},
"pipeline_value": {
"2024-Q3": {"GLOBAL": 52_000_000, "APAC": 14_000_000, "NA": 22_000_000, "EU": 16_000_000},
"2024-Q4": {"GLOBAL": 68_000_000, "APAC": 19_000_000, "NA": 28_000_000, "EU": 21_000_000},
},
}
def comparison_query(params):
metric = params["metric"]
pa, pb = params["period_a"], params["period_b"]
region = params.get("region", "GLOBAL")
val_a = PRECOMPUTED[metric][pa][region]
val_b = PRECOMPUTED[metric][pb][region]
change_pct = round((val_b - val_a) / val_a * 100, 1) if val_a != 0 else None
return {
"metric": metric,
"region": region,
"period_a": {"label": pa, "value": val_a},
"period_b": {"label": pb, "value": val_b},
"change_pct": change_pct,
"source": "daily_batch_aggregation",
"computed_at": "2025-01-02T06:00:00Z",
}
def ranking_query(params):
metric = params["metric"]
period = params["period"]
top_n = params.get("top_n", 10)
data = PRECOMPUTED[metric][period]
sorted_items = sorted(data.items(), key=lambda x: x[1], reverse=True)[:top_n]
return {
"metric": metric,
"period": period,
"ranking": [{"region": r, "value": v} for r, v in sorted_items],
"source": "daily_batch_aggregation",
"computed_at": "2025-01-02T06:00:00Z",
}
def lambda_handler(event, context):
# Bedrock AgentCore 调用 Lambda 时传入的 event 结构
action = event.get("actionGroup", "")
function = event.get("function", "")
parameters = {p["name"]: p["value"] for p in event.get("parameters", [])}
if function == "comparison_query":
result = comparison_query(parameters)
elif function == "ranking_query":
result = ranking_query(parameters)
else:
result = {"error": f"Unknown function: {function}"}
# AgentCore 要求的响应格式
return {
"response": {
"actionGroup": action,
"function": function,
"message": json.dumps(result),
}
}
第三步:准备 Agent 并调用
# 准备 Agent(发布 DRAFT 版本为可用版本)
prepare_resp = client.prepare_agent(agentId=router_agent_id)
print(f"Agent prepared: {prepare_resp['agent']['agentStatus']}")
# 等待准备完成
time.sleep(10)
# 创建 Agent Alias(用于运行时调用)
alias_resp = client.create_agent_alias(
agentId=router_agent_id,
agentAliasName="production",
)
alias_id = alias_resp["agentAlias"]["agentAliasId"]
print(f"Alias ID: {alias_id}")
# 运行时调用 —— 用户提问
runtime_client = boto3.client("bedrock-agent-runtime", region_name="us-east-1")
response = runtime_client.invoke_agent(
agentId=router_agent_id,
agentAliasId=alias_id,
sessionId="demo-session-001",
inputText="亚太区 2024 Q3 到 Q4 新客户增长了多少?",
)
# 流式读取响应
for chunk in response["completion"]:
if "chunk" in chunk:
print(chunk["chunk"]["bytes"].decode("utf-8"), end="")
print()
第四步:加上 Validation Agent(伪代码示意)
Validation Agent 是独立 Agent,接收其他 Agent 的输出做校验。下面展示它的 instruction 设计思路:
validation_agent = client.create_agent(
agentName="bi-validation-agent",
description="Validate outputs from other BI agents",
agentResourceRoleArn="arn:aws:iam::YOUR_ACCOUNT_ID:role/BedrockAgentRuntimeRole",
foundationModel="anthropic.claude-3-5-sonnet-20241022-v2:0",
instruction=(
"You are a data validation agent. You receive a JSON payload from another agent.\n"
"Check the following:\n"
"1. Does every numeric value have a 'source' and 'computed_at' timestamp?\n"
"2. Is change_pct within [-200%, 200%]? Flag anything outside as suspicious.\n"
"3. Does the region match known valid regions: GLOBAL, APAC, NA, EU?\n"
"Return a JSON: {\"valid\": true/false, \"warnings\": [...], \"checked_fields\": [...]}\n"
"Never modify the data. Only validate."
),
)
生产环境中,Validation Agent 会通过 Action Group 调用另一个 Lambda,该 Lambda 读取官方报表做交叉比对。这里用 prompt 约束做轻量校验,已经能挡住大部分低级错误。
落地建议与取舍
| 决策点 | 推荐 | 原因 |
|---|---|---|
| 是否一定要两层架构 | 数据量大、更新频率低 → 是;实时数据源为主 → 可以合并 | 两层的核心收益是避免实时查询跑重聚合 |
| Agent 拆多细 | 3-5 个专用 Agent + 1 个 Router | 太多 Agent 增加路由复杂度,太少则 prompt 混杂、校验难隔离 |
| 校验 Agent 是否必须 | 生产环境必须 | 没有独立校验,Agent 会"自信地犯错",且难以发现 |
| Batch Layer 用什么存储 | DynamoDB(低延迟指标查询)+ Redshift(复杂聚合) | 根据查询模式选,别全塞一个引擎 |
| 灰度策略 | Router 层做流量切分,校验 Agent 监控警告率 | 比全量上线后再回退安全得多 |
几个常见坑:
- Batch Layer 数据过期:预计算指标有 TTL,Agent 返回结果时必须带
computed_at,前端展示时标注数据时效,否则用户拿上周的数做今天的决策。 - Router 误判意图:Router Agent 的 prompt 要覆盖典型问法变体,比如"增长"和"变化"可能走不同 Agent。建议用真实对话日志持续微调 Router 的 instruction。
- 校验 Agent 的假阳性:边界阈值设太紧,正常波动也会被标记。初始阈值建议宽松(比如 ±300%),上线后根据实际分布收紧。
NarrateAI 的核心启示不是"用 Bedrock AgentCore 就能搞定一切",而是:两层架构解决数据时效与响应速度的矛盾,多 Agent 协作解决 prompt 混杂与校验隔离的矛盾。这两个矛盾是所有企业级对话式 BI 都会遇到的,方案可以换(换成 LangGraph、换成自建编排),但架构思路不变。