Cloudflare Workflows V2:确定性重放执行与 5 万并发的工作流引擎

2026-05-15 34 预计阅读时间:1 分钟
来源:infoq.com AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:11 分钟

分布式工作流编排一直有个让人头疼的问题:步骤跑了一半,网络抖了、实例重启了,接下来怎么办?从头重跑浪费已完成的工作,从断点续跑又得自己维护状态——两种路都不省心。Cloudflare Workflows V2 直接把这个问题当作核心设计约束来解决:确定性重放执行(deterministic replayable execution),加上 50,000 并发实例、2,000,000 排队工作流的承载能力,让 AI Agent、数据管道、后台批处理这类长链路任务在边缘网络上跑得更稳。

确定性重放:不是"重试",是"重放"

传统做法里,工作流失败后靠重试策略补救——最多重试 N 次、间隔递增。但重试有两个盲区:

  • 副作用重复执行:第一步已经成功调了支付接口,重试时再调一次就出问题。
  • 状态不一致:重试拿到的输入可能和首次执行时不同(比如数据库里记录已经变了)。

V2 的确定性重放思路不一样。每个步骤的输入和输出都被持久化记录,重放时引擎跳过已完成的步骤,直接用记录的输出喂给下一步。这更像是"从快照恢复"而非"从头再来"。

对开发者来说,写工作流代码时不需要自己操心持久化——引擎替你做了。你只需要把业务逻辑拆成明确的步骤,每一步的返回值就是下一步的入参,中间状态自动可靠。

规模:5 万并发意味着什么

50,000 并发实例 + 2,000,000 排队工作流,这个数字放在边缘网络语境下有几个实际意义:

  1. AI Agent 不再排队等槽位:一个 Agent 可能同时调用搜索、推理、工具执行等多个子步骤,每个 Agent 实例就是一个工作流。5 万并发意味着大规模 Agent 部署不会因为编排层瓶颈而卡住。
  2. 数据管道可以跑得更宽:ETL、日志聚合这类任务天然是高并发短生命周期,2M 排队容量让突发流量不至于丢失任务。
  3. 后台处理从"尽力而为"变成"必达":邮件发送、通知推送、报表生成——这些原来靠 cron + 消息队列勉强拼凑的场景,现在有了一个自带持久化和重放的统一方案。

可观测性:长链路任务的"黑盒"终于开了窗

长工作流最痛苦的不是写代码,而是出了问题不知道卡在哪一步。V2 改进了可观测性:每个步骤的执行状态、耗时、输入输出都可以追踪。结合 Cloudflare Dashboard 和 API,你能看到:

  • 哪个步骤失败、失败原因是什么
  • 重放时跳过了哪些步骤、实际执行了哪些
  • 单个工作流的完整执行时间线

这比自己在日志里拼时间戳靠谱得多。

实战:用 Workflows V2 写一个带重放的 AI Agent 工作流

下面是一个可以直接部署到 Cloudflare Workers 的 Workflows V2 示例。场景:一个 AI Agent 先搜索资料,再调用 LLM 总结,最后把结果写入 KV 存储。每一步都有明确的输入输出,引擎自动持久化,失败后重放时已完成步骤不会重复执行。

// worker.ts — Cloudflare Workers + Workflows V2 入口
import {
  WorkflowEntrypoint,
  WorkflowEvent,
  WorkflowStep,
} from "cloudflare:workers";

interface Env {
  AI: any;           // Cloudflare Workers AI binding
  KV: KVNamespace;   // KV 存储绑定
  WORKFLOW: Workflow; // Workflows binding
}

// 定义工作流参数
type Params = {
  topic: string;
};

// 工作流类——每个 step 的返回值自动持久化,重放时跳过已完成步骤
class AgentWorkflow extends WorkflowEntrypoint<Env, Params> {
  async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
    // 步骤 1:搜索相关资料(模拟调用搜索 API)
    const searchResults = await step.do("search", {
      retries: { max: 3, delay: "5s" },
    }, async () => {
      // 这里替换成实际搜索 API 调用
      const resp = await fetch(
        `https://api.example.com/search?q=${encodeURIComponent(event.payload.topic)}`
      );
      if (!resp.ok) throw new Error(`Search failed: ${resp.status}`);
      const data = await resp.json() as { snippets: string[] };
      return data.snippets;
    });

    // 步骤 2:用 LLM 总结搜索结果
    const summary = await step.do("summarize", {
      retries: { max: 2, delay: "10s" },
    }, async () => {
      const prompt = `基于以下资料,用中文写一段关于"${event.payload.topic}"的简要总结:\n\n${searchResults.join("\n")}`;
      const { response } = await this.env.AI.run("@cf/meta/llama-3.1-8b-instruct", {
        messages: [{ role: "user", content: prompt }],
      });
      return response as string;
    });

    // 步骤 3:写入 KV 存储
    await step.do("store", async () => {
      await this.env.KV.put(
        `summary:${event.payload.topic}`,
        JSON.stringify({ topic: event.payload.topic, summary, createdAt: Date.now() })
      );
    });

    return { topic: event.payload.topic, summary };
  }
}

export default {
  // HTTP 入口:触发工作流
  async fetch(request: Request, env: Env): Promise<Response> {
    const url = new URL(request.url);
    if (url.pathname === "/trigger" && request.method === "POST") {
      const { topic } = await request.json() as Params;
      if (!topic) return new Response("Missing topic", { status: 400 });

      // 创建工作流实例
      const instance = await env.WORKFLOW.create({ params: { topic } });
      return Response.json({
        id: instance.id,
        status: "queued",
        topic,
      });
    }

    // 查询工作流状态
    if (url.pathname === "/status" && request.method === "GET") {
      const id = url.searchParams.get("id");
      if (!id) return new Response("Missing id", { status: 400 });
      const instance = await env.WORKFLOW.get(id);
      const status = await instance.status();
      return Response.json(status);
    }

    return new Response("Not found", { status: 404 });
  },
};

对应的 wrangler.toml 配置:

name = "agent-workflow"
main = "worker.ts"
compatibility_date = "2024-09-23"

[vars]
# 无需额外环境变量

[[kv_namespaces]]
binding = "KV"
id = "你的 KV namespace ID"

[ai]
binding = "AI"

[[workflows]]
binding = "WORKFLOW"
name = "agent-workflow"
class = "AgentWorkflow"

部署命令:

# 安装依赖
npm install wrangler --save-dev

# 登录 Cloudflare
npx wrangler login

# 创建 KV namespace(如果还没有)
npx wrangler kv namespace create "WORKFLOW_KV"

# 部署
npx wrangler deploy

触发工作流并查看状态:

# 触发一个工作流实例
curl -X POST https://agent-workflow.your-subdomain.workers.dev/trigger \
  -H "Content-Type: application/json" \
  -d '{"topic": "Cloudflare Workflows V2 确定性执行"}'

# 返回:{"id":"abc123","status":"queued","topic":"Cloudflare Workflows V2 确定性执行"}

# 查询执行状态
curl "https://agent-workflow.your-subdomain.workers.dev/status?id=abc123"

# 返回包含每个 step 的状态、耗时、输出摘要

关键点:如果 summarize 步骤因 LLM API 临时故障失败,引擎会重放整个工作流,但 search 步骤因为已经成功完成,其输出被直接复用——不会重复调用搜索 API。这就是确定性重放的核心价值。

采用建议与边界

适合上 V2 的场景

  • AI Agent 多步编排:搜索→推理→工具调用→存储,链路长、每步都可能失败。
  • 数据管道 ETL:拉取→清洗→写入,步骤间有明确数据依赖。
  • 异步后台任务:用户触发后长时间运行,需要保证最终完成。

需要注意的限制

  • 步骤粒度要合理:太细(每个 HTTP 调用拆一步)会增加持久化开销;太粗(整个 Agent 逻辑放一步)失败后重放代价大。建议按"一个有意义的业务操作"划分步骤。
  • 副作用要可接受重放:虽然引擎跳过已完成步骤,但首次执行中的副作用(比如发邮件)如果那一步还没完成就被中断,重放时会再执行一次。对不可逆操作,考虑加幂等保护(比如用唯一 ID 防重复发送)。
  • 冷启动延迟:边缘网络上的工作流实例启动比传统服务器快,但首次调用外部 API 时仍有延迟,重试配置要考虑这个因素。
  • 调试需要习惯重放模型:日志里可能出现"跳过步骤"的记录,这不是 bug,是引擎在正常工作——理解重放机制后反而更容易定位问题。

迁移检查清单

  1. 把现有长链路逻辑拆成 step.do() 调用,每步返回明确的值。
  2. 为可能失败的外部调用配置 retries 参数。
  3. 对不可逆副作用加幂等键或去重逻辑。
  4. /status API 或 Dashboard 验证每步执行状态是否符合预期。
  5. 压测时关注并发实例数是否接近 50K 上限——大多数场景远不需要,但大规模 Agent 部署要提前评估。

Workflows V2 不是万能编排层,它专注解决的是"长链路任务在分布式环境里可靠跑完"这个具体问题。如果你的场景正好卡在这里,确定性重放加上边缘网络的规模能力,值得一试。


相关推荐