用多 Agent 系统接管工程支持:Grab 数据仓库平台的实践

2026-05-20 30 预计阅读时间:1 分钟
来源:infoq.com AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:13 分钟

数据仓库平台团队最头疼的事不是写新功能,而是日复一日回答"这个查询为什么慢""表为什么没更新""权限怎么配"。Grab 的 Central Data Team 也被这类重复性支持请求拖住了手脚,直到他们用多 Agent AI 系统把调查和优化两类工作拆开、交给专门的 Agent 处理,才把工程师的时间从"灭火"拉回到真正的平台建设上。

问题:工程支持请求的规模陷阱

Grab 的数据仓库平台服务大量内部团队,支持请求的典型特征:

  • 高频重复——同样的权限问题、数据延迟告警、查询失败排查,每周出现几十次。
  • 上下文分散——一个问题可能涉及 Metastore、调度系统、存储层、权限服务多个子系统。
  • 人工瓶颈——资深工程师处理一个中等复杂度的调查请求平均需要 30–60 分钟,而新人往往更慢且容易遗漏关键线索。

当请求量随业务增长线性上升,人力处理就变成不可持续的方案。单 Agent LLM 调用虽然能回答简单问题,但在需要多步推理、跨系统取证的场景下容易"跑偏"或遗漏步骤。Grab 的选择是:把任务拆给多个专职 Agent,由一个编排层协调它们的工作顺序和信息传递。

架构拆解:调查 Agent 与增强 Agent 分工

Grab 系统的核心设计决策是把工程支持工作分成两条截然不同的工作流:

工作流 职责 典型任务
Investigation(调查) 定位问题根因 查日志、比对配置、追踪数据血缘、确认权限状态
Enhancement(增强) 执行改进动作 生成优化建议、起草配置变更、编写修复脚本、更新文档

为什么要拆开?因为调查需要的是搜索与推理——在大量系统信息中找到关键证据链;增强需要的是生成与执行——基于结论产出可操作的方案。一个 Agent 同时做两件事,容易在"还没查清楚就开始写方案"的陷阱里浪费 token 并产出不可靠结果。

每个 Agent 内部还会进一步细分角色,例如调查流可能包含:

  • Log Analyst Agent:解析任务执行日志,提取异常时间点和错误码。
  • Metadata Inspector Agent:查询 Metastore,确认表结构、分区状态、最近更新时间。
  • Dependency Tracer Agent:沿数据血缘向上追溯,判断是否上游延迟导致下游失败。

增强流则可能包含:

  • Config Advisor Agent:基于调查结论,生成参数调整建议。
  • Script Generator Agent:产出可执行的修复 SQL 或调度配置片段。
  • Doc Updater Agent:将此次问题和方案沉淀到知识库。

编排层:让 Agent 之间不"打架"

多 Agent 系统最大的工程挑战不是单个 Agent 的能力,而是它们之间的协调。Grab 使用了一个独立的 Orchestration Layer 来解决三个问题:

  1. 任务路由——收到请求后,判断该走调查流、增强流还是两者串联。
  2. 上下文传递——调查 Agent 的结论必须结构化地传给增强 Agent,而不是丢一段自然语言文本让对方自行理解。
  3. 失败回退——某个 Agent 超时或返回低置信度结果时,编排层决定是重试、换路径还是升级给人工。

编排层本质上是一个状态机,每个请求的生命周期被建模为一系列阶段转换。下面用一个简化示例展示这种编排思路。

实践示例:用 Python 搭一个最小多 Agent 编排框架

这个示例不还原 Grab 的完整实现(其内部集成了很多 Grab 特有的系统),而是展示"调查 + 增强 + 编排"的核心模式,你可以在此基础上接入自己的工具和 LLM。

import json
import time
from dataclasses import dataclass, field
from typing import Optional

# ── 1. 定义结构化上下文(Agent 之间传递信息的契约)──

@dataclass
class InvestigationResult:
    root_cause: str
    evidence: list[str] = field(default_factory=list)
    confidence: float = 0.0  # 0-1
    needs_human: bool = False

@dataclass
class EnhancementPlan:
    actions: list[str] = field(default_factory=list)
    generated_script: str = ""
    doc_update: str = ""

# ── 2. Agent 基类 ──

class Agent:
    def __init__(self, name: str):
        self.name = name

    def run(self, task: str, context: dict) -> dict:
        """子类实现具体逻辑,这里用 mock 示意"""
        raise NotImplementedError

# ── 3. 调查流 Agent ──

class LogAnalystAgent(Agent):
    """模拟:从日志中提取异常信息"""
    def run(self, task: str, context: dict) -> dict:
        # 实际实现会调用日志系统 API + LLM 分析
        return {
            "agent": self.name,
            "error_codes": ["TIMEOUT", "PARTITION_MISSING"],
            "failed_at": "2024-06-12T03:15:00Z",
            "raw_log_snippet": "Task X failed: partition 2024-06-11 not found in table Y"
        }

class MetadataInspectorAgent(Agent):
    """模拟:检查 Metastore 中的表/分区状态"""
    def run(self, task: str, context: dict) -> dict:
        log_result = context.get("log_analysis", {})
        table_name = "Y"  # 实际应从 log_result 中提取
        return {
            "agent": self.name,
            "table": table_name,
            "last_partition": "2024-06-10",
            "expected_partition": "2024-06-11",
            "partition_gap_detected": True
        }

# ── 4. 增强流 Agent ──

class ScriptGeneratorAgent(Agent):
    """模拟:基于调查结论生成修复脚本"""
    def run(self, task: str, context: dict) -> dict:
        inv_result = context.get("investigation_result")
        if not inv_result or inv_result.confidence < 0.6:
            return {"agent": self.name, "skip_reason": "low confidence, defer to human"}

        # 实际实现会用 LLM 生成针对具体系统的脚本
        return {
            "agent": self.name,
            "fix_script": f"-- Add missing partition for {inv_result.root_cause}\n"
                          f"ALTER TABLE Y ADD PARTITION (ds='2024-06-11') "
                          f"LOCATION 's3://bucket/Y/ds=2024-06-11/';",
            "estimated_risk": "low"
        }

# ── 5. 编排层 ──

class Orchestrator:
    def __init__(self):
        self.log_agent = LogAnalystAgent("log-analyst")
        self.meta_agent = MetadataInspectorAgent("meta-inspector")
        self.script_agent = ScriptGeneratorAgent("script-generator")

    def handle_request(self, request: str) -> dict:
        context = {"original_request": request}

        # ── Phase 1: Investigation ──
        print(f"[Orchestrator] Starting investigation for: {request}")

        log_result = self.log_agent.run(request, context)
        context["log_analysis"] = log_result

        meta_result = self.meta_agent.run(request, context)
        context["meta_analysis"] = meta_result

        # 综合调查结论
        inv_result = InvestigationResult(
            root_cause=f"Partition gap: {meta_result['last_partition']} -> "
                       f"{meta_result['expected_partition']} missing",
            evidence=[
                log_result["raw_log_snippet"],
                f"Metastore shows last_partition={meta_result['last_partition']}"
            ],
            confidence=0.85 if meta_result["partition_gap_detected"] else 0.3,
            needs_human=not meta_result["partition_gap_detected"]
        )
        context["investigation_result"] = inv_result

        # ── 决策门:置信度不够则升级人工 ──
        if inv_result.needs_human or inv_result.confidence < 0.5:
            print("[Orchestrator] Low confidence — escalating to human.")
            return {
                "status": "escalated",
                "investigation": inv_result,
                "reason": "confidence below threshold"
            }

        # ── Phase 2: Enhancement ──
        print("[Orchestrator] Confidence OK — proceeding to enhancement.")
        script_result = self.script_agent.run(request, context)
        context["script_result"] = script_result

        enhancement = EnhancementPlan(
            actions=["add_missing_partition"],
            generated_script=script_result["fix_script"],
            doc_update=f"Root cause: {inv_result.root_cause}. Fix: add partition."
        )

        return {
            "status": "resolved_by_agent",
            "investigation": inv_result,
            "enhancement": enhancement,
            "requires_human_review": script_result.get("estimated_risk") != "low"
        }

# ── 6. 运行 ──

if __name__ == "__main__":
    orch = Orchestrator()
    result = orch.handle_request(
        "数据表 Y 的 6月12日 ETL 任务失败,请排查并修复"
    )
    print(json.dumps(result, default=lambda o: o.__dict__, indent=2, ensure_ascii=False))

运行输出(简化):

{
  "status": "resolved_by_agent",
  "investigation": {
    "root_cause": "Partition gap: 2024-06-10 -> 2024-06-11 missing",
    "evidence": ["...log snippet...", "Metastore shows last_partition=2024-06-10"],
    "confidence": 0.85,
    "needs_human": false
  },
  "enhancement": {
    "actions": ["add_missing_partition"],
    "generated_script": "ALTER TABLE Y ADD PARTITION ...",
    "doc_update": "Root cause: Partition gap..."
  },
  "requires_human_review": false
}

改造要点

  • 把每个 Agent 的 run 方法替换为真实调用——日志系统 API、Metastore 查询接口、LLM 生成调用。
  • InvestigationResultconfidence 可以由 LLM 自评 + 规则校验双重计算。
  • 编排层可以扩展为异步执行,用消息队列解耦 Agent 之间的调用。
  • 生产环境务必加上审计日志和人工审批门(尤其是增强流要执行写操作时)。

效果与取舍

Grab 报告的收益集中在三方面:

  • 运营负载下降——重复性调查请求被 Agent 自动完成,工程师不再逐条手动排查。
  • 解决速度提升——Agent 并行取证比人串行翻系统快,平均响应时间显著缩短。 -时间重新分配——工程师从灭火模式切换到平台改进模式,把精力投入到预防性工作。

但多 Agent 系统也有不容忽视的成本:

成本 说明
编排复杂度 Agent 数量增加后,状态机分支和异常路径会快速膨胀,需要清晰的失败策略
LLM 不确定性 每个 Agent 内部如果依赖 LLM 推理,输出不稳定会传导到下游 Agent
上下文丢失风险 Agent 之间传递结构化信息时,设计不当会导致关键线索被截断
安全边界 增强流 Agent 生成脚本/配置后,自动执行还是人工审批?Grab 选择需要审批

上手清单

如果你也在考虑用多 Agent 系统处理工程支持,建议按这个顺序推进:

  1. 先统计请求类型——把过去 3 个月的支持请求分类,找出占比最高的 3–5 种重复模式,它们才是 Agent 化的首选目标。
  2. 先做调查流——增强流涉及写操作,风险更高;调查流只读,可以先上线验证 Agent 的推理质量。
  3. 定义结构化上下文契约——Agent 之间传递的信息必须用 schema 约束(如上面的 InvestigationResult),不要依赖自然语言传递关键结论。
  4. 设置置信度门槛——低于门槛的结论必须升级人工,不要让低质量推理进入增强流。
  5. 增强流必须有人工审批——生成的脚本和配置变更在执行前需要人工确认,至少在初期阶段。
  6. 逐步增加 Agent——从 2–3 个 Agent 开始,验证编排逻辑稳定后再拆分更多专职角色。

Grab 的实践说明,多 Agent 系统在工程支持场景的价值不是"让 AI 替代工程师",而是把重复性调查和标准化修复从人的待办列表里移走,让工程师专注于只有人才能做好的架构决策和平台演进。


相关推荐