数据平台团队最头疼的不是技术难题,而是日复一日的重复性支持请求:表权限报错、管道延迟、数据质量异常……这些工单把工程师拖进"救火循环",真正该做的平台基建工作反而被挤到角落。Grab 的 Central Data Team 用一套多智能体系统把这类重复任务自动化,让工程师的精力重新回到平台建设上。
救火循环到底烧掉了什么
数据仓库平台一旦服务数百个业务团队,支持请求的量级会迅速失控。典型场景:
- 调查类请求:某个报表数据不对,需要排查上游管道是否延迟、字段是否被改、权限是否缺失。
- 增强类请求:新业务团队需要接入数据,要创建 schema、配置权限、设置 SLA 告警。
两类请求性质不同,但传统做法都靠人工逐条处理。调查类请求需要跨多个系统查日志、看元数据、比对配置;增强类请求需要按模板执行一系列操作。工程师每天在这些低杠杆任务上消耗数小时,响应速度慢,团队士气也受影响。
Grab 的核心洞察:这两类请求可以拆开,分别交给不同专长的智能体处理。
调查与增强:两条流水线,一个调度中枢
Grab 的系统架构围绕一个关键拆分展开:
- Investigation Agents:负责诊断类任务——查日志、读元数据、比对配置、定位根因,最终生成诊断报告。
- Enhancement Agents:负责执行类任务——创建资源、配置权限、设置监控,完成变更并返回确认。
- Orchestration Layer:接收原始请求,判断属于调查还是增强(或两者兼有),将任务分发给对应智能体,协调执行顺序,汇总结果返回给请求方。
这种拆分的实际好处是:调查智能体只需要"读"权限,增强智能体需要"写"权限,两者权限边界清晰,审计和回滚也更容易。如果某个增强操作失败,编排层可以决定是否回退已完成的步骤,而不是让一个全能智能体自己猜该不该撤销。
编排层还负责处理混合型请求——比如"这个表数据异常,修一下",前半段是调查,后半段是增强。编排层先派调查智能体定位问题,拿到诊断结果后再派增强智能体执行修复,两步之间有明确的数据交接。
用代码搭建一个最小多智能体编排骨架
Grab 的生产系统细节未完全公开,但核心模式可以复现。下面是一个用 Python 实现的最小编排骨架,展示调查/增强双流水线 + 编排层的协作逻辑。你可以在此基础上接入 LLM、工具调用和真实的数据平台 API。
import json
from dataclasses import dataclass, field
from typing import Optional
# ── 请求与结果的数据结构 ──────────────────────────────
@dataclass
class SupportRequest:
id: str
description: str
type: str # "investigation" | "enhancement" | "mixed"
context: dict = field(default_factory=dict)
@dataclass
class AgentResult:
agent_name: str
findings: dict = field(default_factory=dict)
actions_taken: list = field(default_factory=list)
success: bool = True
error: Optional[str] = None
# ── 专长智能体 ─────────────────────────────────────────
class InvestigationAgent:
"""调查智能体:只读操作,定位根因"""
def handle(self, request: SupportRequest) -> AgentResult:
# 实际场景中这里会调用 LLM + 工具链查日志/元数据
# 示例:模拟排查管道延迟
findings = {
"root_cause": "pipeline_delay",
"affected_table": request.context.get("table", "unknown"),
"delay_minutes": 45,
"upstream_job": "etl_daily_sales",
}
return AgentResult(
agent_name="investigation",
findings=findings,
)
class EnhancementAgent:
"""增强智能体:执行变更,需要写权限"""
def handle(self, request: SupportRequest, prior_findings: dict = None) -> AgentResult:
# 实际场景中这里会调用平台 API 执行变更
# 示例:模拟修复管道调度
actions = ["rescheduled etl_daily_sales", "updated SLA alert threshold"]
return AgentResult(
agent_name="enhancement",
actions_taken=actions,
findings={"new_schedule": "06:00 UTC", "alert_threshold": "30 min"},
)
# ── 编排层 ──────────────────────────────────────────────
class Orchestrator:
"""调度中枢:判断请求类型,协调多智能体执行"""
def __init__(self):
self.investigation = InvestigationAgent()
self.enhancement = EnhancementAgent()
def classify(self, request: SupportRequest) -> list[str]:
"""根据请求类型决定需要哪些智能体参与"""
mapping = {
"investigation": ["investigation"],
"enhancement": ["enhancement"],
"mixed": ["investigation", "enhancement"],
}
return mapping.get(request.type, ["investigation"])
def orchestrate(self, request: SupportRequest) -> dict:
agents_needed = self.classify(request)
results = []
prior_findings = None
for agent_role in agents_needed:
if agent_role == "investigation":
result = self.investigation.handle(request)
prior_findings = result.findings
elif agent_role == "enhancement":
# 增强智能体可以拿到调查智能体的结论
result = self.enhancement.handle(request, prior_findings)
else:
continue
results.append(result)
# 如果某步失败,编排层决定是否继续
if not result.success:
print(f"[Orchestrator] {agent_role} agent failed: {result.error}")
break
return {
"request_id": request.id,
"agents_used": agents_needed,
"results": [json.dumps(r, default=str) for r in results],
"summary": self._summarize(results),
}
def _summarize(self, results: list[AgentResult]) -> str:
parts = []
for r in results:
if r.agent_name == "investigation":
parts.append(f"根因: {r.findings.get('root_cause', 'unknown')}")
elif r.agent_name == "enhancement":
parts.append(f"已执行: {', '.join(r.actions_taken)}")
return "; ".join(parts) if parts else "no actionable result"
# ── 运行示例 ──────────────────────────────────────────────
if __name__ == "__main__":
orchestrator = Orchestrator()
# 混合型请求:数据异常 + 要求修复
req = SupportRequest(
id="SUP-2024-0891",
description="daily_sales 表数据延迟,请排查并修复",
type="mixed",
context={"table": "daily_sales"},
)
outcome = orchestrator.orchestrate(req)
print(json.dumps(outcome, indent=2, default=str))
运行后输出:
{
"request_id": "SUP-2024-0891",
"agents_used": ["investigation", "enhancement"],
"results": [...],
"summary": "根因: pipeline_delay; 已执行: rescheduled etl_daily_sales, updated SLA alert threshold"
}
改造方向:把 handle 方法里的硬编码逻辑替换为 LLM 工具调用(如 LangChain Agent 或 LangGraph Node),context 字段传入真实平台元数据,EnhancementAgent 的写操作对接数据平台 API 并加上审批门控。
从救火到基建:落地的取舍与检查清单
多智能体系统不是万能药,Grab 的实践也暴露了几道必须跨过的坎:
权限边界是第一道防线。调查智能体只需要读权限,增强智能体需要写权限——但写权限意味着风险。生产环境中增强智能体的每次变更都应该经过审批流或至少有 dry-run 模式,而不是直接执行。编排层可以在分发任务前插入一个人工确认节点,对高风险操作尤其必要。
智能体的专长划分要跟着业务走。Grab 拆出调查和增强两条线,是因为数据仓库支持请求天然分这两类。如果你的平台支持请求有不同分布——比如大量是配置类请求而非诊断类——智能体的划分方式也应该不同,不要照搬这个二分法。
诊断质量决定系统可信度。调查智能体如果经常给出错误根因,增强智能体就会基于错误结论执行变更,后果比人工误操作更难追查。初期可以让调查智能体只生成诊断报告、由人工确认后再触发增强流程,逐步验证准确率后再放开全自动。
落地前的快速检查清单:
- ✅ 支持请求是否可以清晰分为"只读诊断"和"写操作变更"两类?
- ✅ 增强智能体的写操作是否有审批门控或 dry-run 机制?
- ✅ 编排层能否处理混合型请求的两步交接?
- ✅ 调查智能体的诊断准确率是否有度量指标和人工校验流程?
- ✅ 每次智能体变更是否有审计日志和回滚路径?
Grab 的案例证明了一点:多智能体系统的价值不在于替代工程师,而在于把工程师从重复性救火中释放出来,让他们去做只有人才能做好的平台基建决策。架构的关键不是智能体有多聪明,而是调查与增强的拆分是否贴合你的业务,编排层的协调是否可靠,权限边界是否足够清晰。先把这几根柱子立稳,再逐步把 LLM 和工具链接入每个智能体,系统才能真正扛住规模。