Cloudflare 每天处理全球数十亿请求,日志、指标、事件数据散落在数十个系统中。工程师想查一个问题的根因,往往要跨 S3、ClickHouse、Kafka、Postgres 反复跳转。Town Lake 的出现,就是为了终结这种割裂——把所有分析数据统一到一处,再让 AI Agent Skipper 直接在上面回答问题。
为什么需要 Town Lake
Cloudflare 内部之前的数据格局很典型:每个团队按自己的需求选存储,日志丢 S3,实时指标进 ClickHouse,业务数据留在 Postgres。短期看各取所需,长期看灾难——跨源 JOIN 几乎不可能,数据定义不一致,新人上手要理解十几种查询语法。
Town Lake 的核心目标只有一个:让所有分析数据在同一语义层可查。不是物理上把数据搬到同一个数据库,而是在逻辑层统一 schema、统一访问入口。底层存储仍然可以是异构的——S3 做冷存储、ClickHouse 做热查询——但上层只暴露一个 SQL 接口和一套标准表定义。
这种"逻辑统一、物理异构"的思路,和业界 lakehouse 模式一致:Iceberg/Delta Lake 管表元数据,计算引擎按需选择。Cloudflare 的规模意味着他们必须自建而非直接用现成产品,但架构理念相通。
Town Lake 的架构骨架
从公开信息推断,Town Lake 大致由三层构成:
- 存储层:S3 存原始日志和冷数据,ClickHouse 存高频查询的热数据。数据以 Iceberg 格式组织,保证 schema evolution 和 time travel 能力。
- 元数据与语义层:统一 catalog 管理所有表的 schema,无论底层在哪个引擎。这一层还负责权限、数据血缘、质量校验。
- 查询层:统一 SQL 入口,路由到合适的引擎。简单聚合走 ClickHouse,全表扫描或跨源 JOIN 起 Spark/Flink 等批处理引擎。
关键设计决策是不强迫数据搬家。团队可以继续用自己熟悉的存储,只要在 Town Lake catalog 里注册表定义和位置。这大幅降低了推广阻力。
Skipper:坐在数据之上的 AI Agent
有了统一语义层,AI Agent 才有可靠的工作基础。Skipper 做的事很直接:工程师用自然语言提问,Skipper 翻译成 SQL,在 Town Lake 上执行,再把结果组织成人类可读的回答。
Skipper 的能力边界很清晰——它不是通用聊天机器人,而是限定在数据分析领域的 agent。这种限定带来几个好处:
- schema 确定性:Town Lake 的统一 catalog 让 LLM 知道有哪些表、哪些字段、字段含义是什么,生成 SQL 时不会瞎猜。
- 结果可验证:SQL 本身就是可审查的中间产物,工程师可以看 Skipper 生成的查询是否合理,再决定是否信任结论。
- 权限继承:Skipper 用提问者的身份执行查询,天然继承 Town Lake 的权限体系,不会越权。
实践:搭建一个最小化的"Town Lake + Agent"原型
Cloudflare 的系统是大规模自建,但核心思路可以缩小到任何团队。下面用一个可运行的原型演示:用 DuckDB 做统一查询引擎(它天然支持多源查询),用 Python + OpenAI API 做一个 Skipper 式的分析 agent。
第一步:用 DuckDB 统一多源数据
import duckdb
con = duckdb.connect()
# 注册一个 Parquet 文件作为表(模拟 S3 上的冷数据)
con.execute("""
CREATE VIEW request_logs AS
SELECT * FROM read_parquet('data/request_logs.parquet')
""")
# 注册一个 PostgreSQL 远端表(模拟业务数据库)
# 需先安装 duckdb postgres 扩展
con.execute("INSTALL postgres; LOAD postgres;")
con.execute("""
CREATE VIEW user_accounts AS
SELECT * FROM postgres_scan(
'host=localhost port=5432 dbname=myapp user=postgres password=secret',
'public', 'accounts'
)
""")
# 注册一个 CSV(模拟第三方导出数据)
con.execute("""
CREATE VIEW cdn_metrics AS
SELECT * FROM read_csv_auto('data/cdn_metrics.csv')
""")
# 现在可以跨源 JOIN
result = con.execute("""
SELECT r.status_code, COUNT(*) AS cnt
FROM request_logs r
JOIN user_accounts u ON r.user_id = u.id
WHERE u.tier = 'pro'
GROUP BY r.status_code
ORDER BY cnt DESC
""").fetchall()
print(result)
DuckDB 的 read_parquet、postgres_scan、read_csv_auto 就是你的"逻辑统一层"——数据不用搬家,但 SQL 入口只有一个。
第二步:构建 Skipper 式的 Agent
import json
import duckdb
from openai import OpenAI
client = OpenAI() # 需设置 OPENAI_API_KEY 环境变量
con = duckdb.connect()
# 先注册同样的视图(同上,此处省略重复代码)
# ...
# 从 DuckDB 拿 schema 信息,作为 LLM 的上下文
def get_schema_info(conn):
tables = conn.execute("SHOW ALL TABLES").fetchall()
schema_text = ""
for t in tables:
name = t[0]
cols = conn.execute(f"DESCRIBE {name}").fetchall()
schema_text += f"\n表 {name}:\n"
for c in cols:
schema_text += f" - {c[0]} ({c[1]})\n"
return schema_text
SCHEMA = get_schema_info(con)
SYSTEM_PROMPT = f"""你是数据分析助手 Skipper。用户会用自然语言提问,你需要:
1. 生成一条 DuckDB SQL 查询
2. 只使用以下已知表和字段,不要猜测不存在的字段:
{SCHEMA}
3. 输出格式为 JSON:{{"sql": "...", "explanation": "简要解释查询逻辑"}}
4. 如果问题无法用现有表回答,说明缺少什么数据。
"""
def ask_skipper(question: str) -> dict:
resp = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": question}
],
response_format={"type": "json_object"},
temperature=0
)
parsed = json.loads(resp.choices[0].message.content)
sql = parsed["sql"]
explanation = parsed["explanation"]
print(f"🔍 Skipper 解释:{explanation}")
print(f"📝 生成的 SQL:\n{sql}")
try:
rows = con.execute(sql).fetchall()
cols = [desc[0] for desc in con.description]
print(f"\n📊 查询结果({len(rows)} 行):")
for row in rows[:20]:
print(dict(zip(cols, row)))
return {"sql": sql, "rows": rows, "columns": cols}
except Exception as e:
print(f"❌ SQL 执行失败:{e}")
return {"sql": sql, "error": str(e)}
# 使用示例
ask_skipper("过去一周 pro 用户最常见的 5 个 HTTP 状态码是什么?")
运行前需要准备:
data/request_logs.parquet和data/cdn_metrics.csv——可以用 DuckDB 直接生成测试数据:
import duckdb
con = duckdb.connect()
con.execute("""
COPY (SELECT
range AS id,
CASE WHEN range % 3 = 0 THEN 'pro' WHEN range % 3 = 1 THEN 'free' ELSE 'enterprise' END AS tier,
'user' || range::VARCHAR AS name
FROM range(1, 100))
TO 'data/user_accounts.parquet'
""")
con.execute("""
COPY (SELECT
range AS log_id,
range % 50 AS user_id,
CASE WHEN range % 5 = 0 THEN 500 WHEN range % 4 = 0 THEN 404 ELSE 200 END AS status_code,
DATE '2024-01-01' + (range % 7) AS date
FROM range(1, 1000))
TO 'data/request_logs.parquet'
""")
- 设置
OPENAI_API_KEY环境变量。
这个原型展示了 Skipper 的核心循环:schema 约束 → SQL 生成 → 执行验证 → 结果呈现。Cloudflare 的生产版当然更复杂——多轮对话、自动纠错、权限注入、查询缓存——但骨架一样。
从原型到生产:几条关键决策
统一语义层是前提,不是可选项。 没有 catalog,LLM 就在黑暗中摸索字段名,生成的 SQL 不可靠。Cloudflare 花了大量精力在 schema 标准化和文档上,这比选哪个 LLM 更重要。
Agent 要暴露中间产物。 Skipper 不直接给答案,而是先给 SQL。这不仅是信任问题,更是工程实践——SQL 可以被人类审查、复用、优化,变成团队的知识积累。纯黑箱回答做不到这一点。
权限必须继承数据层的规则。 Agent 用提问者的身份执行,而不是用一个超级账号。这避免了"AI 能看到所有数据"的安全灾难。
冷热分层仍然必要。 统一入口不代表统一存储。高频查询走 ClickHouse,全量扫描走批处理引擎,成本和性能才能兼顾。DuckDB 原型里用 read_parquet 直接扫文件,生产环境需要更精细的路由。
迭代推广而非一次性迁移。 Town Lake 不强迫团队搬家,只要求注册表定义。这让推广变成增量过程——先统一查询入口,再逐步迁移存储。Skipper 也是先服务最痛的场景(日志分析),再扩展到其他领域。
如果你团队正在经历"数据到处都是、查什么都慢"的阶段,Town Lake 的思路值得借鉴:先建语义层,再选引擎,最后放 Agent。顺序反了,Agent 就建在沙子上。