用 Bedrock AgentCore 搭建企业级对话式 BI 助手:AWS SMGS 的 NarrateAI 实战拆解

2026-05-28 30 预计阅读时间:1 分钟
来源:aws.amazon.com AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:15 分钟

AWS 内部的 Sales, Marketing and Global Services(SMGS)组织每天要处理海量业务数据——销售管线、营销 ROI、全球服务工单,数据散落在数十个系统里。传统 BI 仪表盘能看数,但没法"对话"。SMGS 团队用 Amazon Bedrock AgentCore 搭了 NarrateAI,让业务人员直接用自然语言提问,背后是一套两层架构 + 多智能体协作的生产级方案。这篇文章拆解它的核心设计,并给出可落地的代码模板。

为什么需要两层架构

NarrateAI 最关键的设计决策:把批量数据处理实时交互拆成两层。

业务数据有几个现实约束:

  • 数据量大、更新频率低(比如每日汇总的销售报表),适合离线预计算;
  • 用户提问是实时的,期望秒级响应,不能等一轮 ETL 跑完;
  • 预计算结果需要校验,防止"AI 信心满满地给了错误数字"。

两层架构的分工:

职责 触发方式 响应时间
Batch Layer 数据摄取、清洗、聚合、指标预计算 定时 / 事件驱动 分钟~小时级
Real-time Layer 意图识别、Agent 路由、查询编排、结果校验、对话生成 用户请求 秒级

Batch Layer 把结果写入向量索引和结构化存储;Real-time Layer 从这些存储里检索,再由多个专用 Agent 协作完成推理和校验。这样用户问"上季度亚太区新客户增长多少",Real-time Layer 不需要重新跑聚合,直接命中预计算指标,再由校验 Agent 确认数字来源可靠。

多 Agent 协作:路由与校验

NarrateAI 不是"一个万能 Agent 包打天下",而是拆成多个专用 Agent,由一个 Router Agent 做分发:

  1. Router Agent——解析用户意图,判断该走哪个下游 Agent。比如"帮我对比 Q3 和 Q4 的 pipeline"走 Comparison Agent,"列出 top 10 客户"走 Ranking Agent。
  2. Comparison Agent——处理时间区间对比、跨维度对比类查询,从预计算指标中取数并生成对比表格。
  3. Ranking Agent——处理排序、Top-N 类查询。
  4. Validation Agent——独立校验其他 Agent 返回的数据:检查指标是否来自可信数据源、数值是否在合理区间、是否与已知基准一致。

Validation Agent 是生产部署中最容易被忽略但最关键的一环。它不是简单打一个置信度分数,而是做三件事:

  • 溯源:每个数字必须能追溯到具体数据源表和时间戳;
  • 边界检查:增长率超过 500%?大概率是数据异常,不是业务奇迹;
  • 交叉验证:与已发布的官方报表做数值比对,偏差超过阈值就标记警告。

这种"生成—校验"双 Agent 模式,比单 Agent 自我检查可靠得多——校验 Agent 的 prompt 和工具集与生成 Agent 完全隔离,避免同源偏差。

生产级工程模式

NarrateAI 在生产环境踩过的坑,提炼出几个可复用的模式:

模式一:Agent 输出的结构化约束

不让 Agent 自由输出自然语言数字,而是要求返回 JSON schema 约定的结构。前端再根据结构渲染表格或图表。这样做的好处:校验 Agent 可以对 JSON 字段做类型和范围检查,而不是解析一段模糊的文字。

模式二:数据源注册与权限绑定

每个数据源在 AgentCore 的 Action Group 里注册时,同时绑定 IAM 权限策略。Agent 只能访问它被授权的数据源。Comparison Agent 不应该能直接读原始工单表——它只需要预计算的汇总指标。

模式三:对话上下文的滑动窗口

长对话中,Agent 的上下文窗口会溢出。NarrateAI 的做法:只保留最近 5 轮对话的完整内容,更早的对话压缩成一句话摘要塞进 system prompt。这样既保持上下文连贯,又控制 token 成本。

模式四:灰度发布与 A/B 回退

新 Agent 版本上线时,Router Agent 先把 10% 的流量路由到新版本,对比校验 Agent 的警告率。警告率飙升就自动回退到旧版本。

实践:用 Bedrock AgentCore 搭一个简化版 BI 助手

下面给出一个最小可运行的示例,展示两层架构的核心骨架。你需要一个 AWS 账号,且已在 us-east-1 区域开通 Bedrock AgentCore 访问权限。

第一步:创建 Agent 和 Action Group

import boto3
import time
import json

client = boto3.client("bedrock-agent", region_name="us-east-1")

# 创建 Router Agent
router_agent = client.create_agent(
    agentName="bi-router-agent",
    description="Route business queries to specialized agents",
    agentResourceRoleArn="arn:aws:iam::YOUR_ACCOUNT_ID:role/BedrockAgentRuntimeRole",
    idleSessionTTLInSeconds=600,
    foundationModel="anthropic.claude-3-5-sonnet-20241022-v2:0",
    instruction=(
        "You are a BI query router. Analyze the user's question and decide which action to take:\n"
        "- comparison_query: for time-period or cross-dimension comparisons\n"
        "- ranking_query: for top-N or sorting requests\n"
        "- general_query: for everything else\n"
        "Always respond with a JSON object: {\"action\": \"<action_name>\", \"params\": {...}}"
    ),
)

router_agent_id = router_agent["agent"]["agentId"]
print(f"Router Agent ID: {router_agent_id}")

# 创建 Action Group —— 对应 Batch Layer 预计算的指标查询
action_group = client.create_agent_action_group(
    agentId=router_agent_id,
    agentVersion="DRAFT",
    actionGroupName="QueryPrecomputedMetrics",
    description="Query precomputed business metrics from the batch layer",
    actionGroupExecutor={
        "lambda": "arn:aws:lambda:us-east-1:YOUR_ACCOUNT_ID:function:query-metrics"
    },
    functionSchema={
        "functions": [
            {
                "name": "comparison_query",
                "description": "Compare metrics between two time periods",
                "parameters": {
                    "metric": {"type": "string", "description": "e.g. new_customer_count, pipeline_value"},
                    "period_a": {"type": "string", "description": "e.g. 2024-Q3"},
                    "period_b": {"type": "string", "description": "e.g. 2024-Q4"},
                    "region": {"type": "string", "description": "e.g. APAC, GLOBAL", "required": False},
                },
            },
            {
                "name": "ranking_query",
                "description": "Get top-N ranking for a metric",
                "parameters": {
                    "metric": {"type": "string", "description": "e.g. revenue, deal_count"},
                    "top_n": {"type": "integer", "description": "number of results, default 10"},
                    "period": {"type": "string", "description": "e.g. 2024-Q4"},
                },
            },
        ]
    },
)

print(f"Action Group ID: {action_group['agentActionGroup']['actionGroupId']}")

运行前替换 YOUR_ACCOUNT_ID 和 Lambda ARN。Lambda 函数的实现见下一步。

第二步:Batch Layer 的 Lambda 函数(简化版)

这个 Lambda 模拟从预计算存储中查指标。生产环境中,它会读 DynamoDB 或 Redshift 的汇总表。

# Lambda function: query-metrics
# 文件名: lambda_function.py

import json

# 模拟 Batch Layer 预计算结果
PRECOMPUTED = {
    "new_customer_count": {
        "2024-Q3": {"GLOBAL": 320, "APAC": 85, "NA": 150, "EU": 85},
        "2024-Q4": {"GLOBAL": 410, "APAC": 120, "NA": 180, "EU": 110},
    },
    "pipeline_value": {
        "2024-Q3": {"GLOBAL": 52_000_000, "APAC": 14_000_000, "NA": 22_000_000, "EU": 16_000_000},
        "2024-Q4": {"GLOBAL": 68_000_000, "APAC": 19_000_000, "NA": 28_000_000, "EU": 21_000_000},
    },
}

def comparison_query(params):
    metric = params["metric"]
    pa, pb = params["period_a"], params["period_b"]
    region = params.get("region", "GLOBAL")
    val_a = PRECOMPUTED[metric][pa][region]
    val_b = PRECOMPUTED[metric][pb][region]
    change_pct = round((val_b - val_a) / val_a * 100, 1) if val_a != 0 else None
    return {
        "metric": metric,
        "region": region,
        "period_a": {"label": pa, "value": val_a},
        "period_b": {"label": pb, "value": val_b},
        "change_pct": change_pct,
        "source": "daily_batch_aggregation",
        "computed_at": "2025-01-02T06:00:00Z",
    }

def ranking_query(params):
    metric = params["metric"]
    period = params["period"]
    top_n = params.get("top_n", 10)
    data = PRECOMPUTED[metric][period]
    sorted_items = sorted(data.items(), key=lambda x: x[1], reverse=True)[:top_n]
    return {
        "metric": metric,
        "period": period,
        "ranking": [{"region": r, "value": v} for r, v in sorted_items],
        "source": "daily_batch_aggregation",
        "computed_at": "2025-01-02T06:00:00Z",
    }

def lambda_handler(event, context):
    # Bedrock AgentCore 调用 Lambda 时传入的 event 结构
    action = event.get("actionGroup", "")
    function = event.get("function", "")
    parameters = {p["name"]: p["value"] for p in event.get("parameters", [])}

    if function == "comparison_query":
        result = comparison_query(parameters)
    elif function == "ranking_query":
        result = ranking_query(parameters)
    else:
        result = {"error": f"Unknown function: {function}"}

    # AgentCore 要求的响应格式
    return {
        "response": {
            "actionGroup": action,
            "function": function,
            "message": json.dumps(result),
        }
    }

第三步:准备 Agent 并调用

# 准备 Agent(发布 DRAFT 版本为可用版本)
prepare_resp = client.prepare_agent(agentId=router_agent_id)
print(f"Agent prepared: {prepare_resp['agent']['agentStatus']}")

# 等待准备完成
time.sleep(10)

# 创建 Agent Alias(用于运行时调用)
alias_resp = client.create_agent_alias(
    agentId=router_agent_id,
    agentAliasName="production",
)
alias_id = alias_resp["agentAlias"]["agentAliasId"]
print(f"Alias ID: {alias_id}")

# 运行时调用 —— 用户提问
runtime_client = boto3.client("bedrock-agent-runtime", region_name="us-east-1")

response = runtime_client.invoke_agent(
    agentId=router_agent_id,
    agentAliasId=alias_id,
    sessionId="demo-session-001",
    inputText="亚太区 2024 Q3 到 Q4 新客户增长了多少?",
)

# 流式读取响应
for chunk in response["completion"]:
    if "chunk" in chunk:
        print(chunk["chunk"]["bytes"].decode("utf-8"), end="")
print()

第四步:加上 Validation Agent(伪代码示意)

Validation Agent 是独立 Agent,接收其他 Agent 的输出做校验。下面展示它的 instruction 设计思路:

validation_agent = client.create_agent(
    agentName="bi-validation-agent",
    description="Validate outputs from other BI agents",
    agentResourceRoleArn="arn:aws:iam::YOUR_ACCOUNT_ID:role/BedrockAgentRuntimeRole",
    foundationModel="anthropic.claude-3-5-sonnet-20241022-v2:0",
    instruction=(
        "You are a data validation agent. You receive a JSON payload from another agent.\n"
        "Check the following:\n"
        "1. Does every numeric value have a 'source' and 'computed_at' timestamp?\n"
        "2. Is change_pct within [-200%, 200%]? Flag anything outside as suspicious.\n"
        "3. Does the region match known valid regions: GLOBAL, APAC, NA, EU?\n"
        "Return a JSON: {\"valid\": true/false, \"warnings\": [...], \"checked_fields\": [...]}\n"
        "Never modify the data. Only validate."
    ),
)

生产环境中,Validation Agent 会通过 Action Group 调用另一个 Lambda,该 Lambda 读取官方报表做交叉比对。这里用 prompt 约束做轻量校验,已经能挡住大部分低级错误。

落地建议与取舍

决策点 推荐 原因
是否一定要两层架构 数据量大、更新频率低 → 是;实时数据源为主 → 可以合并 两层的核心收益是避免实时查询跑重聚合
Agent 拆多细 3-5 个专用 Agent + 1 个 Router 太多 Agent 增加路由复杂度,太少则 prompt 混杂、校验难隔离
校验 Agent 是否必须 生产环境必须 没有独立校验,Agent 会"自信地犯错",且难以发现
Batch Layer 用什么存储 DynamoDB(低延迟指标查询)+ Redshift(复杂聚合) 根据查询模式选,别全塞一个引擎
灰度策略 Router 层做流量切分,校验 Agent 监控警告率 比全量上线后再回退安全得多

几个常见坑:

  • Batch Layer 数据过期:预计算指标有 TTL,Agent 返回结果时必须带 computed_at,前端展示时标注数据时效,否则用户拿上周的数做今天的决策。
  • Router 误判意图:Router Agent 的 prompt 要覆盖典型问法变体,比如"增长"和"变化"可能走不同 Agent。建议用真实对话日志持续微调 Router 的 instruction。
  • 校验 Agent 的假阳性:边界阈值设太紧,正常波动也会被标记。初始阈值建议宽松(比如 ±300%),上线后根据实际分布收紧。

NarrateAI 的核心启示不是"用 Bedrock AgentCore 就能搞定一切",而是:两层架构解决数据时效与响应速度的矛盾,多 Agent 协作解决 prompt 混杂与校验隔离的矛盾。这两个矛盾是所有企业级对话式 BI 都会遇到的,方案可以换(换成 LangGraph、换成自建编排),但架构思路不变。


相关推荐