SQLite 做 AI 工作流持久化,够用吗?

2026-06-01 32 预计阅读时间: 1 分钟
来源: oschina.net AI 摘要 Original link

Disclaimer: This article is an AI-assisted summary. Read it together with the original source when precision matters. The summary may omit context, version differences, or edge cases and is not official documentation.

预计阅读时间:12 分钟

最近 DBOS 发了一篇博文,核心论点是"Postgres 就是你持久化执行所需的全部"——既然你已经信任你的数据库来存业务数据,何必再引入一层编排引擎?这个观点在社区里引发了不少共鸣。但 Obelisk 团队觉得,这条逻辑链还可以再往前推一步:对一大类持久化场景,SQLite 就够了,连 Postgres 都不必上。

这不是为了省钱而省钱。背后是对 AI 工作流本质特征的重新审视。

AI 工作流到底需要什么"持久"

传统微服务编排(Temporal、Step Functions 那一套)面对的是跨服务、跨天的长事务——订单流转、支付对账,状态要扛住进程崩溃、网络分区、节点重启。这类场景确实需要 Postgres 级别的 WAL、MVCC、网络可达性。

但 AI 工作流不一样。一个典型的 Agent 执行链路:

  1. 接收用户指令
  2. 调 LLM 生成计划
  3. 逐步执行工具调用
  4. 汇总结果返回

整条链路通常在秒级到分钟级完成。真正需要持久化的不是"跨天的分布式事务",而是步骤间的 checkpoint——万一某步失败,能从上一个成功步骤恢复,而不是从头再来重跑一遍 LLM 调用(那可是真金白银的 token 费)。

这意味着:

  • 单进程内的持久化就够了,不需要跨节点共识
  • 写频次低,一次执行几十次写入,不是每秒万级 TPS
  • 读模式简单,按执行 ID 查状态,不需要复杂查询
  • 崩溃恢复是刚需,但恢复窗口是"下次启动时",不是"毫秒级热切换"

这些特征恰好落在 SQLite 的舒适区。

SQLite 的三个被低估的优势

零运维的嵌入式部署

AI 应用最常见的形态是单实例部署——一个 API 服务挂一个 Agent 运行器。SQLite 作为嵌入式数据库,跟着进程走,没有连接池、没有独立进程、没有运维面板。部署复杂度从"应用 + 数据库服务器"降到"应用本身"。

对于快速迭代的 AI 项目,这意味着你不会因为数据库配置、权限、迁移脚本而卡住。sqlite3 文件就在项目目录里,git 能追踪,调试时直接用 sqlite3 命令行打开看。

ACID 事务是标配,不是奢侈品

SQLite 默认开启 WAL 模式后,提供完整的 ACID 事务保证。一个 Agent 步骤的状态更新——从 runningcompleted,附带输出 JSON——在单行事务里完成。进程崩溃?WAL 文件在重启时自动回放,不会丢半个步骤的状态。

这比自己在文件系统上写 JSON checkpoint 文件靠谱得多。文件写入不是原子操作,write + rename 的套路在并发和崩溃场景下都有坑。SQLite 把这些底层细节全包了。

全量数据在本地,调试零摩擦

AI 工作流调试最痛苦的一环是"状态散在各处"。LLM 的中间输出在日志里,工具调用的结果在另一个服务里,步骤状态在编排引擎的数据库里——拼凑一次完整执行轨迹要跨三个系统。

SQLite 把所有步骤状态、中间输出、错误记录都放在一个文件里。调试时一条 SQL 就能拉出完整执行链路:

SELECT step_id, status, output, created_at
FROM workflow_steps
WHERE execution_id = 'abc-123'
ORDER BY step_seq;

这种"数据全在本地"的体验,对快速排查 Agent 行为异常至关重要。

用 SQLite 做 Agent 步骤持久化:一个可跑的例子

下面是一个最小但完整的 Agent 执行器,用 SQLite 做步骤级 checkpoint。每次执行一个步骤前写入 running 状态,完成后更新为 completed 并存输出。进程崩溃后重启,能自动找到最后一个 running 步骤并重试。

先安装依赖:

pip install sqlite3-utils  # 可选,方便初始化;标准库自带 sqlite3

核心代码:

import sqlite3
import json
import time
from pathlib import Path

DB_PATH = Path("agent_state.db")

def init_db():
    """创建步骤状态表,只需运行一次。"""
    conn = sqlite3.connect(str(DB_PATH))
    conn.execute("PRAGMA journal_mode=WAL")  # 开启 WAL,崩溃安全
    conn.execute("""
        CREATE TABLE IF NOT EXISTS workflow_steps (
            execution_id  TEXT NOT NULL,
            step_seq      INTEGER NOT NULL,
            step_id       TEXT NOT NULL,
            status        TEXT NOT NULL DEFAULT 'pending',
            input_json    TEXT,
            output_json   TEXT,
            error_msg     TEXT,
            created_at    REAL NOT NULL,
            updated_at    REAL NOT NULL,
            PRIMARY KEY (execution_id, step_seq)
        )
    """)
    conn.commit()
    conn.close()

def start_step(exec_id: str, seq: int, step_id: str, input_data: dict):
    """步骤开始前写入 running 状态——这是 checkpoint。"""
    conn = sqlite3.connect(str(DB_PATH))
    now = time.time()
    conn.execute("""
        INSERT INTO workflow_steps (execution_id, step_seq, step_id,
                                    status, input_json, created_at, updated_at)
        VALUES (?, ?, ?, 'running', ?, ?, ?)
    """, (exec_id, seq, step_id, json.dumps(input_data), now, now))
    conn.commit()
    conn.close()

def complete_step(exec_id: str, seq: int, output_data: dict):
    """步骤成功完成,更新状态和输出。"""
    conn = sqlite3.connect(str(DB_PATH))
    conn.execute("""
        UPDATE workflow_steps
        SET status = 'completed', output_json = ?, updated_at = ?
        WHERE execution_id = ? AND step_seq = ?
    """, (json.dumps(output_data), time.time(), exec_id, seq))
    conn.commit()
    conn.close()

def fail_step(exec_id: str, seq: int, error: str):
    """步骤失败,记录错误信息。"""
    conn = sqlite3.connect(str(DB_PATH))
    conn.execute("""
        UPDATE workflow_steps
        SET status = 'failed', error_msg = ?, updated_at = ?
        WHERE execution_id = ? AND step_seq = ?
    """, (error, time.time(), exec_id, seq))
    conn.commit()
    conn.close()

def find_last_running(exec_id: str) -> dict | None:
    """崩溃恢复:找到最后一个 running 状态的步骤。"""
    conn = sqlite3.connect(str(DB_PATH))
    row = conn.execute("""
        SELECT step_seq, step_id, input_json
        FROM workflow_steps
        WHERE execution_id = ? AND status = 'running'
        ORDER BY step_seq DESC LIMIT 1
    """, (exec_id,)).fetchone()
    conn.close()
    if row:
        return {"seq": row[0], "step_id": row[1], "input": json.loads(row[2])}
    return None

# ---- 模拟一次 Agent 执行 ----

def run_agent(exec_id: str):
    """带崩溃恢复的 Agent 执行循环。"""
    init_db()

    # 检查是否有未完成的步骤(崩溃恢复场景)
    last_running = find_last_running(exec_id)
    if last_running:
        print(f"恢复执行:从步骤 {last_running['seq']} ({last_running['step_id']}) 重试")
        start_seq = last_running["seq"]
    else:
        start_seq = 0

    steps = [
        ("plan",   {"prompt": "帮我分析这段代码的性能瓶颈"}),
        ("search", {"query": "Python profiling best practices"}),
        ("analyze", {"code": "def process(items): ..."}),
        ("report", {"format": "markdown"}),
    ]

    for i in range(start_seq, len(steps)):
        step_id, input_data = steps[i]
        start_step(exec_id, i, step_id, input_data)

        try:
            # 这里放真实的 LLM / 工具调用逻辑
            # 模拟:假装执行成功
            output = {"result": f"{step_id} 步骤输出(模拟)"}
            complete_step(exec_id, i, output)
            print(f"步骤 {i} ({step_id}) 完成")
        except Exception as e:
            fail_step(exec_id, i, str(e))
            print(f"步骤 {i} ({step_id}) 失败: {e}")
            break  # 失败后停止,下次可从该步恢复

# 运行
run_agent("exec-001")

# 查看完整执行轨迹
conn = sqlite3.connect(str(DB_PATH))
rows = conn.execute("""
    SELECT step_seq, step_id, status, output_json
    FROM workflow_steps WHERE execution_id = 'exec-001'
    ORDER BY step_seq
""").fetchall()
for r in rows:
    print(r)
conn.close()

运行后你会看到每一步的状态都被持久化。如果中途进程被 kill,下次用同一个 execution_idrun_agent,它会自动从崩溃点继续。

这个模式的关键设计:

  • 步骤开始前先写 running,不是完成后才写。这保证了"至少知道哪一步在执行中"。
  • WAL 模式确保崩溃后数据文件不会损坏。
  • 单文件数据库可以随项目一起版本管理,方便回溯调试。

什么时候 SQLite 不够

说"SQLite 就够了",前提是场景匹配。以下情况需要认真考虑 Postgres 或其他方案:

  • 多实例并发写入同一状态:SQLite 的写锁是文件级的,并发写入会排队。如果你的 Agent 服务有多个副本同时更新同一个工作流,SQLite 会成为瓶颈。
  • 状态数据量超过单机磁盘:SQLite 单文件建议控制在几 GB 以内。如果工作流历史数据需要长期保留且量级持续增长,需要 Postgres 的分区和归档能力。
  • 需要实时跨节点查询状态:比如一个监控面板要聚合多个服务实例的工作流状态,SQLite 的数据散在各实例本地,查询要额外搭聚合层。

但回到大多数 AI 工作流的现实:单实例部署、状态数据量可控、调试优先于分布式查询。SQLite 的边界并不容易触碰。

选择清单

在决定用 SQLite 还是 Postgres 做 AI 工作流持久化时,可以快速走一遍这个判断:

问题 选 SQLite 选 Postgres
部署形态 单实例或少量实例 多实例高可用集群
并发写模式 同一工作流单进程串行 多进程并发写同一状态
数据量预期 几 GB 以内 持续增长,需分区归档
运维资源 不想管数据库服务器 有 DBA 或托管服务
调试频率 高,需要随时翻状态 低,主要靠日志
崩溃恢复需求 进程重启时恢复 毫秒级热切换

如果你的答案大部分在左列,SQLite 是更务实的选择——少一层依赖,少一份运维,多一份调试便利。不是"Postgres 不行",而是"这个场景不需要 Postgres 的全部能力"。

持久化系统的选择,归根到底是对需求边界的诚实评估。AI 工作流还在快速演化,今天够用的方案明天可能需要升级——但反过来,为"也许将来需要"提前上全套基础设施,同样是需要付出的成本。SQLite 给了一个低摩擦的起点,让你先跑起来,再根据实际瓶颈决定是否加重量。


相关推荐