AgentWatch:让 AWS 监控从被动告警变成主动巡检

2026-05-27 21 预计阅读时间:1 分钟
来源:aws.amazon.com AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:13 分钟

凌晨三点,CloudWatch 告警响了。你爬起来打开控制台,发现 CPU 利用率飙升——但到底是哪台实例、哪个服务、哪条日志链路出了问题?你需要跨账号翻指标、查日志、看告警历史,十五分钟后才拼出完整故事。

AgentWatch 把这个流程反过来:不是等告警触发再追查,而是每 15 分钟主动巡检,把 CloudWatch 指标、日志和告警跨账号汇总成一份可操作的报告,直接推到 Slack。你还可以用自然语言问它:"昨晚 prod 账号有没有异常?"它直接回答。

这篇文章拆解 AgentWatch 的实现思路,重点看三个 human-in-the-loop 模式如何在自动化和人工把控之间找到平衡。

每 15 分钟一次的主动巡检

传统监控是"阈值触发→告警→人工排查"。AgentWatch 改成"定时巡检→汇总→推送报告",区别在于你不需要先被告警吓一跳才知道出了事。

核心流程:

  1. 定时触发——每 15 分钟,agent 自动拉取多个 AWS 账号的 CloudWatch 指标、最近日志条目、当前活跃告警。
  2. 汇总分析——不是原始数据堆砌,而是生成结构化摘要:哪些指标偏离基线、哪些告警持续未恢复、日志中是否出现高频错误模式。
  3. 推送 Slack——摘要以可读格式发到指定 Slack channel,包含关键数字和一键跳转链接。

下面是一个用 Python + EventBridge 实现定时巡检触发器的最小示例:

import json
import boto3
import os

SLACK_WEBHOOK = os.environ["SLACK_WEBHOOK"]
ACCOUNT_IDS = os.environ["ACCOUNT_IDS"].split(",")  # e.g. "111111111111,222222222222"

cw = boto3.client("cloudwatch")
logs = boto3.client("logs")
alarms = boto3.client("cloudwatch", region_name="us-east-1")


def fetch_metrics(account_id: str) -> dict:
    """拉取指定账号的关键 CloudWatch 指标"""
    # 跨账号需要 role assumption,这里简化为直接查询
    cpu_util = cw.get_metric_statistics(
        Namespace="AWS/EC2",
        MetricName="CPUUtilization",
        Dimensions=[{"Name": "AccountId", "Value": account_id}],
        Period=900,  # 15 分钟
        Statistics=["Average", "Maximum"],
        StartTime="now - 15m",  # 实际代码用 datetime 计算
        EndTime="now",
    )
    return {"account": account_id, "cpu": cpu_util}


def fetch_active_alarms() -> list:
    """获取所有处于 ALARM 状态的告警"""
    resp = alarms.describe_alarms(StateValue="ALARM")
    return [{"name": a["AlarmName"], "state": a["StateValue"]} for a in resp["MetricAlarms"]]


def summarize(accounts: list[str]) -> str:
    """汇总所有账号数据,生成 Slack 消息文本"""
    metrics = [fetch_metrics(acc) for acc in accounts]
    alarms = fetch_active_alarms()

    lines = ["📊 *AgentWatch 15min巡检报告*\n"]
    for m in metrics:
        avg = m["cpu"]["Datapoints"][0]["Average"] if m["cpu"]["Datapoints"] else 0
        max_val = m["cpu"]["Datapoints"][0]["Maximum"] if m["cpu"]["Datapoints"] else 0
        lines.append(f"• 账号 `{m['account']}` CPU avg={avg:.1f}% max={max_val:.1f}%")

    if alarms:
        lines.append("\n🔴 *活跃告警:*")
        for a in alarms:
            lines.append(f"  - {a['name']} ({a['state']})")
    else:
        lines.append("\n✅ 无活跃告警")

    return "\n".join(lines)


def send_to_slack(text: str):
    import urllib.request
    payload = json.dumps({"text": text}).encode("utf-8")
    req = urllib.request.Request(SLACK_WEBHOOK, data=payload, headers={"Content-Type": "application/json"})
    urllib.request.urlopen(req)


def lambda_handler(event, context):
    report = summarize(ACCOUNT_IDS)
    send_to_slack(report)
    return {"statusCode": 200, "body": "巡检报告已推送"}

部署定时触发用 EventBridge 规则:

aws events put-rule \
  --name "AgentWatch-15min-schedule" \
  --schedule-expression "rate(15 minutes)" \
  --state ENABLED

aws lambda add-permission \
  --function-name AgentWatchSummarizer \
  --statement-id EventBridgeInvoke \
  --action lambda:InvokeFunction \
  --principal events.amazonaws.com \
  --source-arn arn:aws:events:us-east-1:111111111111:rule/AgentWatch-15min-schedule

运行前需要替换 ACCOUNT_IDSSLACK_WEBHOOK 为你的实际值。跨账号场景还需要配置 IAM role assumption,上面代码做了简化。

自然语言查询:和基础设施对话

AgentWatch 不只是定时推送,它还接受自然语言提问。比如你在 Slack 里打:

"prod-east 账号过去一小时的 Error 日志有多少?"

agent 会解析意图,调用 CloudWatch Logs 的 filter_log_events,统计 ERROR 级别日志数量,返回具体数字和趋势。

实现上,这通常是一个 LLM + tool-use 的模式。下面展示一个用 OpenAI function calling 风格的工具定义:

import openai

TOOLS = [
    {
        "type": "function",
        "function": {
            "name": "query_cloudwatch_logs",
            "description": "查询指定 AWS 账号和时间段内的 CloudWatch Logs,按关键词过滤",
            "parameters": {
                "type": "object",
                "properties": {
                    "account_id": {"type": "string", "description": "AWS 账号 ID"},
                    "log_group": {"type": "string", "description": "日志组名称,如 /aws/lambda/my-function"},
                    "filter_pattern": {"type": "string", "description": "CloudWatch Logs 过滤模式,如 ERROR"},
                    "time_range_minutes": {"type": "integer", "description": "查询最近多少分钟的数据"},
                },
                "required": ["account_id", "log_group", "filter_pattern", "time_range_minutes"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "list_active_alarms",
            "description": "列出指定账号当前处于 ALARM 状态的所有 CloudWatch 告警",
            "parameters": {
                "type": "object",
                "properties": {
                    "account_id": {"type": "string", "description": "AWS 账号 ID"},
                },
                "required": ["account_id"],
            },
        },
    },
]


def handle_query(user_message: str) -> str:
    response = openai.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": user_message}],
        tools=TOOLS,
        tool_choice="auto",
    )

    msg = response.choices[0].message

    # 如果模型决定调用工具,执行并返回结果
    if msg.tool_calls:
        results = []
        for call in msg.tool_calls:
            args = json.loads(call.function.arguments)
            if call.function.name == "query_cloudwatch_logs":
                data = execute_log_query(args)  # 实际调用 AWS SDK
                results.append({"tool_call_id": call.id, "output": json.dumps(data)})
            elif call.function.name == "list_active_alarms":
                data = execute_alarm_query(args)
                results.append({"tool_call_id": call.id, "output": json.dumps(data)})

        # 把工具结果喂回模型,让它生成最终回答
        followup = openai.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "user", "content": user_message},
                msg,
                {"role": "tool", "tool_call_id": results[0]["tool_call_id"], "content": results[0]["output"]},
            ],
        )
        return followup.choices[0].message.content

    return msg.content

关键设计点:工具定义要精确到参数级别,让模型知道它能查什么、怎么查。不要给模型太多自由度——限定 account_id 为必填参数,避免它猜一个不存在的账号。

三种 Human-in-the-Loop 模式

AgentWatch 的核心张力在于:自动化越多,效率越高,但失控风险也越大。文章探讨了三种 oversight 模式,每种对应不同的信任等级。

模式一:审批式——agent 提议,人拍板

适用场景:操作有不可逆风险,比如终止实例、修改安全组规则。

agent 生成一个提议消息推到 Slack:

⚠️ AgentWatch 建议:prod-api-07 的 CPU 连续 3 个周期超过 90%,建议重启实例。
[批准] [拒绝] [查看详情]

人点"批准"才执行。实现上用 Slack interactive message + Lambda callback:

# Slack interactive message payload
action_block = {
    "type": "actions",
    "elements": [
        {"type": "button", "text": {"type": "plain_text", "text": "批准"}, "action_id": "approve_restart", "value": "i-0abc123"},
        {"type": "button", "text": {"type": "plain_text", "text": "拒绝"}, "action_id": "reject_restart", "style": "danger", "value": "i-0abc123"},
    ],
}

Lambda 收到 callback 后根据 action_id 决定是否调用 ec2.reboot_instances

模式二:通知式——agent 执行,事后告知

适用场景:低风险操作,比如调整 Auto Scaling Group 的 DesiredCapacity 从 2 到 3。

agent 直接执行,然后在 Slack 发一条确认消息:

✅ AgentWatch 已将 asg-prod-api 的 DesiredCapacity 从 2 调整为 3(原因:CPU 平均值持续偏高)

人不需要提前审批,但事后能看到做了什么。如果发现不对,可以手动回滚。

模式三:观察式——agent 只报告,人决定下一步

适用场景:信息收集和趋势分析,不涉及任何变更操作。

agent 每 15 分钟的巡检报告就是这个模式。它只读数据、汇总、推送,不触发任何写操作。这是最安全的模式,也是 AgentWatch 的默认行为。

三种模式的选用原则很直接:写操作越危险,人的介入越早。读操作可以全自动化;低风险写操作可以事后通知;高风险写操作必须事前审批。

落地时的几个坑

跨账号 IAM 配置是最容易卡住的地方。 AgentWatch 要读多个账号的 CloudWatch 数据,每个账号都需要一个允许跨账号访问的 role。建议用一个集中的 AgentWatchReader role,各账号通过 sts:AssumeRole 授权。权限范围只给 cloudwatch:GetMetricStatisticslogs:FilterLogEventscloudwatch:DescribeAlarms,不要给宽泛的读权限。

Slack 消息格式要克制。 15 分钟一次的报告如果太长,团队会习惯性忽略。把摘要控制在 10 行以内,关键数字用 emoji 标记,详情用链接跳转到 dashboard。

LLM 工具调用的边界要硬编码。 不要让 agent 自己决定是否执行写操作——在代码层面把写操作的路由强制走审批流程,即使模型"觉得"这个操作很安全。

快速上手清单

  1. 创建一个 Lambda 函数处理定时巡检和 Slack 推送,配置 EventBridge 15 分钟触发。
  2. 在每个被监控账号创建 AgentWatchReader role,授权只读 CloudWatch 数据。
  3. 配置 Slack Incoming Webhook 和 Interactive Messages(如果需要审批按钮)。
  4. 如果启用自然语言查询,部署 LLM tool-use handler,工具定义只暴露读操作;写操作走审批通道。
  5. 先用观察式模式跑一周,看报告质量;再逐步开放通知式和审批式操作。

AgentWatch 的价值不在于替代你的判断,而是把"被动等告警"变成"主动看全貌"。当凌晨三点的 Slack 消息不再是刺耳的告警,而是已经帮你整理好的摘要,你会知道这个模式跑通了。


相关推荐