凌晨三点,CloudWatch 告警响了。你爬起来打开控制台,发现 CPU 利用率飙升——但到底是哪台实例、哪个服务、哪条日志链路出了问题?你需要跨账号翻指标、查日志、看告警历史,十五分钟后才拼出完整故事。
AgentWatch 把这个流程反过来:不是等告警触发再追查,而是每 15 分钟主动巡检,把 CloudWatch 指标、日志和告警跨账号汇总成一份可操作的报告,直接推到 Slack。你还可以用自然语言问它:"昨晚 prod 账号有没有异常?"它直接回答。
这篇文章拆解 AgentWatch 的实现思路,重点看三个 human-in-the-loop 模式如何在自动化和人工把控之间找到平衡。
每 15 分钟一次的主动巡检
传统监控是"阈值触发→告警→人工排查"。AgentWatch 改成"定时巡检→汇总→推送报告",区别在于你不需要先被告警吓一跳才知道出了事。
核心流程:
- 定时触发——每 15 分钟,agent 自动拉取多个 AWS 账号的 CloudWatch 指标、最近日志条目、当前活跃告警。
- 汇总分析——不是原始数据堆砌,而是生成结构化摘要:哪些指标偏离基线、哪些告警持续未恢复、日志中是否出现高频错误模式。
- 推送 Slack——摘要以可读格式发到指定 Slack channel,包含关键数字和一键跳转链接。
下面是一个用 Python + EventBridge 实现定时巡检触发器的最小示例:
import json
import boto3
import os
SLACK_WEBHOOK = os.environ["SLACK_WEBHOOK"]
ACCOUNT_IDS = os.environ["ACCOUNT_IDS"].split(",") # e.g. "111111111111,222222222222"
cw = boto3.client("cloudwatch")
logs = boto3.client("logs")
alarms = boto3.client("cloudwatch", region_name="us-east-1")
def fetch_metrics(account_id: str) -> dict:
"""拉取指定账号的关键 CloudWatch 指标"""
# 跨账号需要 role assumption,这里简化为直接查询
cpu_util = cw.get_metric_statistics(
Namespace="AWS/EC2",
MetricName="CPUUtilization",
Dimensions=[{"Name": "AccountId", "Value": account_id}],
Period=900, # 15 分钟
Statistics=["Average", "Maximum"],
StartTime="now - 15m", # 实际代码用 datetime 计算
EndTime="now",
)
return {"account": account_id, "cpu": cpu_util}
def fetch_active_alarms() -> list:
"""获取所有处于 ALARM 状态的告警"""
resp = alarms.describe_alarms(StateValue="ALARM")
return [{"name": a["AlarmName"], "state": a["StateValue"]} for a in resp["MetricAlarms"]]
def summarize(accounts: list[str]) -> str:
"""汇总所有账号数据,生成 Slack 消息文本"""
metrics = [fetch_metrics(acc) for acc in accounts]
alarms = fetch_active_alarms()
lines = ["📊 *AgentWatch 15min巡检报告*\n"]
for m in metrics:
avg = m["cpu"]["Datapoints"][0]["Average"] if m["cpu"]["Datapoints"] else 0
max_val = m["cpu"]["Datapoints"][0]["Maximum"] if m["cpu"]["Datapoints"] else 0
lines.append(f"• 账号 `{m['account']}` CPU avg={avg:.1f}% max={max_val:.1f}%")
if alarms:
lines.append("\n🔴 *活跃告警:*")
for a in alarms:
lines.append(f" - {a['name']} ({a['state']})")
else:
lines.append("\n✅ 无活跃告警")
return "\n".join(lines)
def send_to_slack(text: str):
import urllib.request
payload = json.dumps({"text": text}).encode("utf-8")
req = urllib.request.Request(SLACK_WEBHOOK, data=payload, headers={"Content-Type": "application/json"})
urllib.request.urlopen(req)
def lambda_handler(event, context):
report = summarize(ACCOUNT_IDS)
send_to_slack(report)
return {"statusCode": 200, "body": "巡检报告已推送"}
部署定时触发用 EventBridge 规则:
aws events put-rule \
--name "AgentWatch-15min-schedule" \
--schedule-expression "rate(15 minutes)" \
--state ENABLED
aws lambda add-permission \
--function-name AgentWatchSummarizer \
--statement-id EventBridgeInvoke \
--action lambda:InvokeFunction \
--principal events.amazonaws.com \
--source-arn arn:aws:events:us-east-1:111111111111:rule/AgentWatch-15min-schedule
运行前需要替换 ACCOUNT_IDS 和 SLACK_WEBHOOK 为你的实际值。跨账号场景还需要配置 IAM role assumption,上面代码做了简化。
自然语言查询:和基础设施对话
AgentWatch 不只是定时推送,它还接受自然语言提问。比如你在 Slack 里打:
"prod-east 账号过去一小时的 Error 日志有多少?"
agent 会解析意图,调用 CloudWatch Logs 的 filter_log_events,统计 ERROR 级别日志数量,返回具体数字和趋势。
实现上,这通常是一个 LLM + tool-use 的模式。下面展示一个用 OpenAI function calling 风格的工具定义:
import openai
TOOLS = [
{
"type": "function",
"function": {
"name": "query_cloudwatch_logs",
"description": "查询指定 AWS 账号和时间段内的 CloudWatch Logs,按关键词过滤",
"parameters": {
"type": "object",
"properties": {
"account_id": {"type": "string", "description": "AWS 账号 ID"},
"log_group": {"type": "string", "description": "日志组名称,如 /aws/lambda/my-function"},
"filter_pattern": {"type": "string", "description": "CloudWatch Logs 过滤模式,如 ERROR"},
"time_range_minutes": {"type": "integer", "description": "查询最近多少分钟的数据"},
},
"required": ["account_id", "log_group", "filter_pattern", "time_range_minutes"],
},
},
},
{
"type": "function",
"function": {
"name": "list_active_alarms",
"description": "列出指定账号当前处于 ALARM 状态的所有 CloudWatch 告警",
"parameters": {
"type": "object",
"properties": {
"account_id": {"type": "string", "description": "AWS 账号 ID"},
},
"required": ["account_id"],
},
},
},
]
def handle_query(user_message: str) -> str:
response = openai.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": user_message}],
tools=TOOLS,
tool_choice="auto",
)
msg = response.choices[0].message
# 如果模型决定调用工具,执行并返回结果
if msg.tool_calls:
results = []
for call in msg.tool_calls:
args = json.loads(call.function.arguments)
if call.function.name == "query_cloudwatch_logs":
data = execute_log_query(args) # 实际调用 AWS SDK
results.append({"tool_call_id": call.id, "output": json.dumps(data)})
elif call.function.name == "list_active_alarms":
data = execute_alarm_query(args)
results.append({"tool_call_id": call.id, "output": json.dumps(data)})
# 把工具结果喂回模型,让它生成最终回答
followup = openai.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": user_message},
msg,
{"role": "tool", "tool_call_id": results[0]["tool_call_id"], "content": results[0]["output"]},
],
)
return followup.choices[0].message.content
return msg.content
关键设计点:工具定义要精确到参数级别,让模型知道它能查什么、怎么查。不要给模型太多自由度——限定 account_id 为必填参数,避免它猜一个不存在的账号。
三种 Human-in-the-Loop 模式
AgentWatch 的核心张力在于:自动化越多,效率越高,但失控风险也越大。文章探讨了三种 oversight 模式,每种对应不同的信任等级。
模式一:审批式——agent 提议,人拍板
适用场景:操作有不可逆风险,比如终止实例、修改安全组规则。
agent 生成一个提议消息推到 Slack:
⚠️ AgentWatch 建议:prod-api-07 的 CPU 连续 3 个周期超过 90%,建议重启实例。
[批准] [拒绝] [查看详情]
人点"批准"才执行。实现上用 Slack interactive message + Lambda callback:
# Slack interactive message payload
action_block = {
"type": "actions",
"elements": [
{"type": "button", "text": {"type": "plain_text", "text": "批准"}, "action_id": "approve_restart", "value": "i-0abc123"},
{"type": "button", "text": {"type": "plain_text", "text": "拒绝"}, "action_id": "reject_restart", "style": "danger", "value": "i-0abc123"},
],
}
Lambda 收到 callback 后根据 action_id 决定是否调用 ec2.reboot_instances。
模式二:通知式——agent 执行,事后告知
适用场景:低风险操作,比如调整 Auto Scaling Group 的 DesiredCapacity 从 2 到 3。
agent 直接执行,然后在 Slack 发一条确认消息:
✅ AgentWatch 已将 asg-prod-api 的 DesiredCapacity 从 2 调整为 3(原因:CPU 平均值持续偏高)
人不需要提前审批,但事后能看到做了什么。如果发现不对,可以手动回滚。
模式三:观察式——agent 只报告,人决定下一步
适用场景:信息收集和趋势分析,不涉及任何变更操作。
agent 每 15 分钟的巡检报告就是这个模式。它只读数据、汇总、推送,不触发任何写操作。这是最安全的模式,也是 AgentWatch 的默认行为。
三种模式的选用原则很直接:写操作越危险,人的介入越早。读操作可以全自动化;低风险写操作可以事后通知;高风险写操作必须事前审批。
落地时的几个坑
跨账号 IAM 配置是最容易卡住的地方。 AgentWatch 要读多个账号的 CloudWatch 数据,每个账号都需要一个允许跨账号访问的 role。建议用一个集中的 AgentWatchReader role,各账号通过 sts:AssumeRole 授权。权限范围只给 cloudwatch:GetMetricStatistics、logs:FilterLogEvents、cloudwatch:DescribeAlarms,不要给宽泛的读权限。
Slack 消息格式要克制。 15 分钟一次的报告如果太长,团队会习惯性忽略。把摘要控制在 10 行以内,关键数字用 emoji 标记,详情用链接跳转到 dashboard。
LLM 工具调用的边界要硬编码。 不要让 agent 自己决定是否执行写操作——在代码层面把写操作的路由强制走审批流程,即使模型"觉得"这个操作很安全。
快速上手清单
- 创建一个 Lambda 函数处理定时巡检和 Slack 推送,配置 EventBridge 15 分钟触发。
- 在每个被监控账号创建
AgentWatchReaderrole,授权只读 CloudWatch 数据。 - 配置 Slack Incoming Webhook 和 Interactive Messages(如果需要审批按钮)。
- 如果启用自然语言查询,部署 LLM tool-use handler,工具定义只暴露读操作;写操作走审批通道。
- 先用观察式模式跑一周,看报告质量;再逐步开放通知式和审批式操作。
AgentWatch 的价值不在于替代你的判断,而是把"被动等告警"变成"主动看全貌"。当凌晨三点的 Slack 消息不再是刺耳的告警,而是已经帮你整理好的摘要,你会知道这个模式跑通了。