数据仓库平台团队最头疼的事不是写新功能,而是日复一日回答"这个查询为什么慢""表为什么没更新""权限怎么配"。Grab 的 Central Data Team 也被这类重复性支持请求拖住了手脚,直到他们用多 Agent AI 系统把调查和优化两类工作拆开、交给专门的 Agent 处理,才把工程师的时间从"灭火"拉回到真正的平台建设上。
问题:工程支持请求的规模陷阱
Grab 的数据仓库平台服务大量内部团队,支持请求的典型特征:
- 高频重复——同样的权限问题、数据延迟告警、查询失败排查,每周出现几十次。
- 上下文分散——一个问题可能涉及 Metastore、调度系统、存储层、权限服务多个子系统。
- 人工瓶颈——资深工程师处理一个中等复杂度的调查请求平均需要 30–60 分钟,而新人往往更慢且容易遗漏关键线索。
当请求量随业务增长线性上升,人力处理就变成不可持续的方案。单 Agent LLM 调用虽然能回答简单问题,但在需要多步推理、跨系统取证的场景下容易"跑偏"或遗漏步骤。Grab 的选择是:把任务拆给多个专职 Agent,由一个编排层协调它们的工作顺序和信息传递。
架构拆解:调查 Agent 与增强 Agent 分工
Grab 系统的核心设计决策是把工程支持工作分成两条截然不同的工作流:
| 工作流 | 职责 | 典型任务 |
|---|---|---|
| Investigation(调查) | 定位问题根因 | 查日志、比对配置、追踪数据血缘、确认权限状态 |
| Enhancement(增强) | 执行改进动作 | 生成优化建议、起草配置变更、编写修复脚本、更新文档 |
为什么要拆开?因为调查需要的是搜索与推理——在大量系统信息中找到关键证据链;增强需要的是生成与执行——基于结论产出可操作的方案。一个 Agent 同时做两件事,容易在"还没查清楚就开始写方案"的陷阱里浪费 token 并产出不可靠结果。
每个 Agent 内部还会进一步细分角色,例如调查流可能包含:
- Log Analyst Agent:解析任务执行日志,提取异常时间点和错误码。
- Metadata Inspector Agent:查询 Metastore,确认表结构、分区状态、最近更新时间。
- Dependency Tracer Agent:沿数据血缘向上追溯,判断是否上游延迟导致下游失败。
增强流则可能包含:
- Config Advisor Agent:基于调查结论,生成参数调整建议。
- Script Generator Agent:产出可执行的修复 SQL 或调度配置片段。
- Doc Updater Agent:将此次问题和方案沉淀到知识库。
编排层:让 Agent 之间不"打架"
多 Agent 系统最大的工程挑战不是单个 Agent 的能力,而是它们之间的协调。Grab 使用了一个独立的 Orchestration Layer 来解决三个问题:
- 任务路由——收到请求后,判断该走调查流、增强流还是两者串联。
- 上下文传递——调查 Agent 的结论必须结构化地传给增强 Agent,而不是丢一段自然语言文本让对方自行理解。
- 失败回退——某个 Agent 超时或返回低置信度结果时,编排层决定是重试、换路径还是升级给人工。
编排层本质上是一个状态机,每个请求的生命周期被建模为一系列阶段转换。下面用一个简化示例展示这种编排思路。
实践示例:用 Python 搭一个最小多 Agent 编排框架
这个示例不还原 Grab 的完整实现(其内部集成了很多 Grab 特有的系统),而是展示"调查 + 增强 + 编排"的核心模式,你可以在此基础上接入自己的工具和 LLM。
import json
import time
from dataclasses import dataclass, field
from typing import Optional
# ── 1. 定义结构化上下文(Agent 之间传递信息的契约)──
@dataclass
class InvestigationResult:
root_cause: str
evidence: list[str] = field(default_factory=list)
confidence: float = 0.0 # 0-1
needs_human: bool = False
@dataclass
class EnhancementPlan:
actions: list[str] = field(default_factory=list)
generated_script: str = ""
doc_update: str = ""
# ── 2. Agent 基类 ──
class Agent:
def __init__(self, name: str):
self.name = name
def run(self, task: str, context: dict) -> dict:
"""子类实现具体逻辑,这里用 mock 示意"""
raise NotImplementedError
# ── 3. 调查流 Agent ──
class LogAnalystAgent(Agent):
"""模拟:从日志中提取异常信息"""
def run(self, task: str, context: dict) -> dict:
# 实际实现会调用日志系统 API + LLM 分析
return {
"agent": self.name,
"error_codes": ["TIMEOUT", "PARTITION_MISSING"],
"failed_at": "2024-06-12T03:15:00Z",
"raw_log_snippet": "Task X failed: partition 2024-06-11 not found in table Y"
}
class MetadataInspectorAgent(Agent):
"""模拟:检查 Metastore 中的表/分区状态"""
def run(self, task: str, context: dict) -> dict:
log_result = context.get("log_analysis", {})
table_name = "Y" # 实际应从 log_result 中提取
return {
"agent": self.name,
"table": table_name,
"last_partition": "2024-06-10",
"expected_partition": "2024-06-11",
"partition_gap_detected": True
}
# ── 4. 增强流 Agent ──
class ScriptGeneratorAgent(Agent):
"""模拟:基于调查结论生成修复脚本"""
def run(self, task: str, context: dict) -> dict:
inv_result = context.get("investigation_result")
if not inv_result or inv_result.confidence < 0.6:
return {"agent": self.name, "skip_reason": "low confidence, defer to human"}
# 实际实现会用 LLM 生成针对具体系统的脚本
return {
"agent": self.name,
"fix_script": f"-- Add missing partition for {inv_result.root_cause}\n"
f"ALTER TABLE Y ADD PARTITION (ds='2024-06-11') "
f"LOCATION 's3://bucket/Y/ds=2024-06-11/';",
"estimated_risk": "low"
}
# ── 5. 编排层 ──
class Orchestrator:
def __init__(self):
self.log_agent = LogAnalystAgent("log-analyst")
self.meta_agent = MetadataInspectorAgent("meta-inspector")
self.script_agent = ScriptGeneratorAgent("script-generator")
def handle_request(self, request: str) -> dict:
context = {"original_request": request}
# ── Phase 1: Investigation ──
print(f"[Orchestrator] Starting investigation for: {request}")
log_result = self.log_agent.run(request, context)
context["log_analysis"] = log_result
meta_result = self.meta_agent.run(request, context)
context["meta_analysis"] = meta_result
# 综合调查结论
inv_result = InvestigationResult(
root_cause=f"Partition gap: {meta_result['last_partition']} -> "
f"{meta_result['expected_partition']} missing",
evidence=[
log_result["raw_log_snippet"],
f"Metastore shows last_partition={meta_result['last_partition']}"
],
confidence=0.85 if meta_result["partition_gap_detected"] else 0.3,
needs_human=not meta_result["partition_gap_detected"]
)
context["investigation_result"] = inv_result
# ── 决策门:置信度不够则升级人工 ──
if inv_result.needs_human or inv_result.confidence < 0.5:
print("[Orchestrator] Low confidence — escalating to human.")
return {
"status": "escalated",
"investigation": inv_result,
"reason": "confidence below threshold"
}
# ── Phase 2: Enhancement ──
print("[Orchestrator] Confidence OK — proceeding to enhancement.")
script_result = self.script_agent.run(request, context)
context["script_result"] = script_result
enhancement = EnhancementPlan(
actions=["add_missing_partition"],
generated_script=script_result["fix_script"],
doc_update=f"Root cause: {inv_result.root_cause}. Fix: add partition."
)
return {
"status": "resolved_by_agent",
"investigation": inv_result,
"enhancement": enhancement,
"requires_human_review": script_result.get("estimated_risk") != "low"
}
# ── 6. 运行 ──
if __name__ == "__main__":
orch = Orchestrator()
result = orch.handle_request(
"数据表 Y 的 6月12日 ETL 任务失败,请排查并修复"
)
print(json.dumps(result, default=lambda o: o.__dict__, indent=2, ensure_ascii=False))
运行输出(简化):
{
"status": "resolved_by_agent",
"investigation": {
"root_cause": "Partition gap: 2024-06-10 -> 2024-06-11 missing",
"evidence": ["...log snippet...", "Metastore shows last_partition=2024-06-10"],
"confidence": 0.85,
"needs_human": false
},
"enhancement": {
"actions": ["add_missing_partition"],
"generated_script": "ALTER TABLE Y ADD PARTITION ...",
"doc_update": "Root cause: Partition gap..."
},
"requires_human_review": false
}
改造要点:
- 把每个 Agent 的
run方法替换为真实调用——日志系统 API、Metastore 查询接口、LLM 生成调用。 InvestigationResult的confidence可以由 LLM 自评 + 规则校验双重计算。- 编排层可以扩展为异步执行,用消息队列解耦 Agent 之间的调用。
- 生产环境务必加上审计日志和人工审批门(尤其是增强流要执行写操作时)。
效果与取舍
Grab 报告的收益集中在三方面:
- 运营负载下降——重复性调查请求被 Agent 自动完成,工程师不再逐条手动排查。
- 解决速度提升——Agent 并行取证比人串行翻系统快,平均响应时间显著缩短。 -时间重新分配——工程师从灭火模式切换到平台改进模式,把精力投入到预防性工作。
但多 Agent 系统也有不容忽视的成本:
| 成本 | 说明 |
|---|---|
| 编排复杂度 | Agent 数量增加后,状态机分支和异常路径会快速膨胀,需要清晰的失败策略 |
| LLM 不确定性 | 每个 Agent 内部如果依赖 LLM 推理,输出不稳定会传导到下游 Agent |
| 上下文丢失风险 | Agent 之间传递结构化信息时,设计不当会导致关键线索被截断 |
| 安全边界 | 增强流 Agent 生成脚本/配置后,自动执行还是人工审批?Grab 选择需要审批 |
上手清单
如果你也在考虑用多 Agent 系统处理工程支持,建议按这个顺序推进:
- 先统计请求类型——把过去 3 个月的支持请求分类,找出占比最高的 3–5 种重复模式,它们才是 Agent 化的首选目标。
- 先做调查流——增强流涉及写操作,风险更高;调查流只读,可以先上线验证 Agent 的推理质量。
- 定义结构化上下文契约——Agent 之间传递的信息必须用 schema 约束(如上面的
InvestigationResult),不要依赖自然语言传递关键结论。 - 设置置信度门槛——低于门槛的结论必须升级人工,不要让低质量推理进入增强流。
- 增强流必须有人工审批——生成的脚本和配置变更在执行前需要人工确认,至少在初期阶段。
- 逐步增加 Agent——从 2–3 个 Agent 开始,验证编排逻辑稳定后再拆分更多专职角色。
Grab 的实践说明,多 Agent 系统在工程支持场景的价值不是"让 AI 替代工程师",而是把重复性调查和标准化修复从人的待办列表里移走,让工程师专注于只有人才能做好的架构决策和平台演进。