Cloudflare 如何搭建统一数据平台 Town Lake 及其上的 AI Agent Skipper

2026-05-28 17 预计阅读时间:1 分钟
来源:blog.cloudflare.com AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:11 分钟

Cloudflare 每天处理全球数十亿请求,日志、指标、事件数据散落在数十个系统中。工程师想查一个问题的根因,往往要跨 S3、ClickHouse、Kafka、Postgres 反复跳转。Town Lake 的出现,就是为了终结这种割裂——把所有分析数据统一到一处,再让 AI Agent Skipper 直接在上面回答问题。

为什么需要 Town Lake

Cloudflare 内部之前的数据格局很典型:每个团队按自己的需求选存储,日志丢 S3,实时指标进 ClickHouse,业务数据留在 Postgres。短期看各取所需,长期看灾难——跨源 JOIN 几乎不可能,数据定义不一致,新人上手要理解十几种查询语法。

Town Lake 的核心目标只有一个:让所有分析数据在同一语义层可查。不是物理上把数据搬到同一个数据库,而是在逻辑层统一 schema、统一访问入口。底层存储仍然可以是异构的——S3 做冷存储、ClickHouse 做热查询——但上层只暴露一个 SQL 接口和一套标准表定义。

这种"逻辑统一、物理异构"的思路,和业界 lakehouse 模式一致:Iceberg/Delta Lake 管表元数据,计算引擎按需选择。Cloudflare 的规模意味着他们必须自建而非直接用现成产品,但架构理念相通。

Town Lake 的架构骨架

从公开信息推断,Town Lake 大致由三层构成:

  • 存储层:S3 存原始日志和冷数据,ClickHouse 存高频查询的热数据。数据以 Iceberg 格式组织,保证 schema evolution 和 time travel 能力。
  • 元数据与语义层:统一 catalog 管理所有表的 schema,无论底层在哪个引擎。这一层还负责权限、数据血缘、质量校验。
  • 查询层:统一 SQL 入口,路由到合适的引擎。简单聚合走 ClickHouse,全表扫描或跨源 JOIN 起 Spark/Flink 等批处理引擎。

关键设计决策是不强迫数据搬家。团队可以继续用自己熟悉的存储,只要在 Town Lake catalog 里注册表定义和位置。这大幅降低了推广阻力。

Skipper:坐在数据之上的 AI Agent

有了统一语义层,AI Agent 才有可靠的工作基础。Skipper 做的事很直接:工程师用自然语言提问,Skipper 翻译成 SQL,在 Town Lake 上执行,再把结果组织成人类可读的回答。

Skipper 的能力边界很清晰——它不是通用聊天机器人,而是限定在数据分析领域的 agent。这种限定带来几个好处:

  • schema 确定性:Town Lake 的统一 catalog 让 LLM 知道有哪些表、哪些字段、字段含义是什么,生成 SQL 时不会瞎猜。
  • 结果可验证:SQL 本身就是可审查的中间产物,工程师可以看 Skipper 生成的查询是否合理,再决定是否信任结论。
  • 权限继承:Skipper 用提问者的身份执行查询,天然继承 Town Lake 的权限体系,不会越权。

实践:搭建一个最小化的"Town Lake + Agent"原型

Cloudflare 的系统是大规模自建,但核心思路可以缩小到任何团队。下面用一个可运行的原型演示:用 DuckDB 做统一查询引擎(它天然支持多源查询),用 Python + OpenAI API 做一个 Skipper 式的分析 agent。

第一步:用 DuckDB 统一多源数据

import duckdb

con = duckdb.connect()

# 注册一个 Parquet 文件作为表(模拟 S3 上的冷数据)
con.execute("""
    CREATE VIEW request_logs AS
    SELECT * FROM read_parquet('data/request_logs.parquet')
""")

# 注册一个 PostgreSQL 远端表(模拟业务数据库)
# 需先安装 duckdb postgres 扩展
con.execute("INSTALL postgres; LOAD postgres;")
con.execute("""
    CREATE VIEW user_accounts AS
    SELECT * FROM postgres_scan(
        'host=localhost port=5432 dbname=myapp user=postgres password=secret',
        'public', 'accounts'
    )
""")

# 注册一个 CSV(模拟第三方导出数据)
con.execute("""
    CREATE VIEW cdn_metrics AS
    SELECT * FROM read_csv_auto('data/cdn_metrics.csv')
""")

# 现在可以跨源 JOIN
result = con.execute("""
    SELECT r.status_code, COUNT(*) AS cnt
    FROM request_logs r
    JOIN user_accounts u ON r.user_id = u.id
    WHERE u.tier = 'pro'
    GROUP BY r.status_code
    ORDER BY cnt DESC
""").fetchall()
print(result)

DuckDB 的 read_parquetpostgres_scanread_csv_auto 就是你的"逻辑统一层"——数据不用搬家,但 SQL 入口只有一个。

第二步:构建 Skipper 式的 Agent

import json
import duckdb
from openai import OpenAI

client = OpenAI()  # 需设置 OPENAI_API_KEY 环境变量
con = duckdb.connect()

# 先注册同样的视图(同上,此处省略重复代码)
# ...

# 从 DuckDB 拿 schema 信息,作为 LLM 的上下文
def get_schema_info(conn):
    tables = conn.execute("SHOW ALL TABLES").fetchall()
    schema_text = ""
    for t in tables:
        name = t[0]
        cols = conn.execute(f"DESCRIBE {name}").fetchall()
        schema_text += f"\n{name}:\n"
        for c in cols:
            schema_text += f"  - {c[0]} ({c[1]})\n"
    return schema_text

SCHEMA = get_schema_info(con)

SYSTEM_PROMPT = f"""你是数据分析助手 Skipper。用户会用自然语言提问,你需要:
1. 生成一条 DuckDB SQL 查询
2. 只使用以下已知表和字段,不要猜测不存在的字段:
{SCHEMA}
3. 输出格式为 JSON:{{"sql": "...", "explanation": "简要解释查询逻辑"}}
4. 如果问题无法用现有表回答,说明缺少什么数据。
"""

def ask_skipper(question: str) -> dict:
    resp = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": question}
        ],
        response_format={"type": "json_object"},
        temperature=0
    )
    parsed = json.loads(resp.choices[0].message.content)
    sql = parsed["sql"]
    explanation = parsed["explanation"]

    print(f"🔍 Skipper 解释:{explanation}")
    print(f"📝 生成的 SQL:\n{sql}")

    try:
        rows = con.execute(sql).fetchall()
        cols = [desc[0] for desc in con.description]
        print(f"\n📊 查询结果({len(rows)} 行):")
        for row in rows[:20]:
            print(dict(zip(cols, row)))
        return {"sql": sql, "rows": rows, "columns": cols}
    except Exception as e:
        print(f"❌ SQL 执行失败:{e}")
        return {"sql": sql, "error": str(e)}

# 使用示例
ask_skipper("过去一周 pro 用户最常见的 5 个 HTTP 状态码是什么?")

运行前需要准备:

  1. data/request_logs.parquetdata/cdn_metrics.csv——可以用 DuckDB 直接生成测试数据:
import duckdb
con = duckdb.connect()
con.execute("""
    COPY (SELECT
        range AS id,
        CASE WHEN range % 3 = 0 THEN 'pro' WHEN range % 3 = 1 THEN 'free' ELSE 'enterprise' END AS tier,
        'user' || range::VARCHAR AS name
      FROM range(1, 100))
    TO 'data/user_accounts.parquet'
""")
con.execute("""
    COPY (SELECT
        range AS log_id,
        range % 50 AS user_id,
        CASE WHEN range % 5 = 0 THEN 500 WHEN range % 4 = 0 THEN 404 ELSE 200 END AS status_code,
        DATE '2024-01-01' + (range % 7) AS date
      FROM range(1, 1000))
    TO 'data/request_logs.parquet'
""")
  1. 设置 OPENAI_API_KEY 环境变量。

这个原型展示了 Skipper 的核心循环:schema 约束 → SQL 生成 → 执行验证 → 结果呈现。Cloudflare 的生产版当然更复杂——多轮对话、自动纠错、权限注入、查询缓存——但骨架一样。

从原型到生产:几条关键决策

统一语义层是前提,不是可选项。 没有 catalog,LLM 就在黑暗中摸索字段名,生成的 SQL 不可靠。Cloudflare 花了大量精力在 schema 标准化和文档上,这比选哪个 LLM 更重要。

Agent 要暴露中间产物。 Skipper 不直接给答案,而是先给 SQL。这不仅是信任问题,更是工程实践——SQL 可以被人类审查、复用、优化,变成团队的知识积累。纯黑箱回答做不到这一点。

权限必须继承数据层的规则。 Agent 用提问者的身份执行,而不是用一个超级账号。这避免了"AI 能看到所有数据"的安全灾难。

冷热分层仍然必要。 统一入口不代表统一存储。高频查询走 ClickHouse,全量扫描走批处理引擎,成本和性能才能兼顾。DuckDB 原型里用 read_parquet 直接扫文件,生产环境需要更精细的路由。

迭代推广而非一次性迁移。 Town Lake 不强迫团队搬家,只要求注册表定义。这让推广变成增量过程——先统一查询入口,再逐步迁移存储。Skipper 也是先服务最痛的场景(日志分析),再扩展到其他领域。


如果你团队正在经历"数据到处都是、查什么都慢"的阶段,Town Lake 的思路值得借鉴:先建语义层,再选引擎,最后放 Agent。顺序反了,Agent 就建在沙子上。


相关推荐