Anthropic 给 Claude Code 加了一个狠活——动态工作流(Dynamic Workflows)。核心思路很简单:让 Claude 自己写编排脚本,在一个会话里同时跑几十甚至上百个子 agent,每个子 agent 各干各的,最后汇总结果。原来按季度排期的工程级任务,压缩到几天甚至一个周末完成。
他们拿这个功能做了一次验证:把 Bun 的 Zig 代码移植到 Rust。整个移植过程,从拆解任务到并行执行再到合并验证,全由 Claude Code 的动态工作流驱动完成。
动态工作流到底动了什么
传统用法里,Claude Code 是单线程的——你给一个指令,它一步步执行,遇到问题停下来问你。任务大了,要么手动拆成多个会话,要么反复来回确认,效率天花板很低。
动态工作流打破了这个天花板,关键变化有三点:
- Claude 自己写编排脚本。不再是人去拆任务,Claude 根据目标自动生成一个类似 Makefile 的执行计划,定义哪些子任务可以并行、哪些有依赖关系。
- 数十到数百个子 agent 并行运行。每个子 agent 是独立的 Claude Code 实例,有自己的上下文和工具权限,能读文件、写代码、跑测试。
- 会话级汇总。所有子 agent 的结果回到主会话,Claude 做合并、冲突解决和最终验证。
这本质上是从"人指挥 AI"变成了"AI 指挥 AI"。人只负责定义目标和验收标准。
Bun Zig→Rust 移植:一个真实案例的解剖
Bun 的运行时核心原本用 Zig 编写。Zig 有出色的 C 互操作和手动内存控制,但生态和人才池相比 Rust 偏小。把核心模块迁移到 Rust,工程量不小——涉及数千个函数的语义等价转换、测试套件的逐个对齐、以及两个语言在内存模型和错误处理上的差异。
用动态工作流的执行路径大致如下:
主 agent 分析 Bun 源码结构 → 生成移植计划
├── 子 agent 1: 移植 runtime/core.zig → runtime/core.rs
├── 子 agent 2: 移植 runtime/jsc.zig → runtime/jsc.rs
├── 子 agent 3: 移植 runtime/bundler.zig → runtime/bundler.rs
├── ...
├── 子 agent N: 移植第 N 个模块
└── 所有子 agent 完成后 → 主 agent 合并、解决冲突、跑全量测试
每个子 agent 拿到的是明确的输入文件、输出文件和语义等价要求。它们不需要知道全局进度,只需要把分配给自己的 Zig 模块翻译成 Rust,并保证对应测试通过。
这种拆法让原本串行需要数周的工作,在并行度拉满后压缩到周末两天。
实操:用 Claude Code 动态工作流跑一个并行重构
下面用一个可运行的小项目演示动态工作流的使用方式。假设你有一个 Python 项目的三个模块需要从同步代码改成 async 版本,我们让 Claude Code 自动编排并行改造。
第一步:准备项目结构
# 创建示例项目
mkdir async-refactor-demo && cd async-refactor-demo
# 三个待改造的同步模块
cat > fetcher.py << 'EOF'
import requests
def get_user(id):
resp = requests.get(f"https://api.example.com/users/{id}")
return resp.json()
def get_all_users():
resp = requests.get("https://api.example.com/users")
return resp.json()
EOF
cat > processor.py << 'EOF'
import json
def transform_user(raw):
return {
"id": raw["id"],
"display": f"{raw['first']} {raw['last']}"
}
def batch_transform(users):
return [transform_user(u) for u in users]
EOF
cat > writer.py << 'EOF'
import csv
def save_users(users, path="output.csv"):
with open(path, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=["id", "display"])
w.writeheader()
w.writerows(users)
return path
EOF
# 测试文件
cat > test_sync.py << 'EOF'
from fetcher import get_user, get_all_users
from processor import transform_user, batch_transform
from writer import save_users
def test_transform():
result = transform_user({"id": 1, "first": "A", "last": "B"})
assert result["display"] == "A B"
def test_batch():
users = [{"id": 1, "first": "A", "last": "B"}]
results = batch_transform(users)
assert len(results) == 1
EOF
第二步:用 Claude Code 发起动态工作流
# 确保 Claude Code 已安装并登录
claude --version
# 发起动态工作流任务
claude --dynamic-workflow \
"将 fetcher.py、processor.py、writer.py 三个模块从同步改为 async。
每个模块独立改造,并行执行。
要求:
1. fetcher.py 用 aiohttp 替换 requests
2. processor.py 的 batch_transform 改为 asyncio.gather 并发处理
3. writer.py 用 aiofiles 替换同步文件操作
4. 所有改造后函数名加 async_ 前缀,原同步函数保留作兼容
5. 更新 test_sync.py 为 test_async.py,用 pytest-asyncio
6. 每个子 agent 完成后跑对应测试确认通过"
Claude Code 收到这个指令后,会自动:
- 分析三个模块的依赖关系——发现它们之间没有强依赖,可以完全并行。
- 生成编排脚本,为每个模块分配一个子 agent。
- 并行启动三个子 agent,各自独立改造。
- 等所有子 agent 完成后,主 agent 合并结果,跑全量测试。
第三步:查看编排结果
# Claude Code 会在项目目录生成工作流记录
cat .claude-workflow/plan.json
预期输出类似:
{
"task": "async-refactor",
"subagents": [
{
"id": "sa-1",
"target": "fetcher.py",
"status": "completed",
"tests_passed": true
},
{
"id": "sa-2",
"target": "processor.py",
"status": "completed",
"tests_passed": true
},
{
"id": "sa-3",
"target": "writer.py",
"status": "completed",
"tests_passed": true
}
],
"merge_conflicts": 0,
"final_test_run": "passed"
}
改造完成后,你可以直接验证:
# 安装 async 依赖
pip install aiohttp aiofiles pytest-asyncio
# 跑测试
pytest test_async.py -v
编排脚本:动态工作流的真正引擎
动态工作流不是简单的"多开几个窗口并行跑"。它的核心是 Claude 自己编写的编排脚本——一个声明式的执行计划,描述任务拓扑、依赖关系和汇总策略。
你可以直接查看 Claude 生成的编排脚本:
cat .claude-workflow/orchestration.sh
典型内容类似:
#!/usr/bash
# Claude Code 动态工作流编排脚本
# 生成时间: 2024-XX-XX
# 阶段1: 并行改造(无依赖,可全并行)
claude --subagent --id sa-1 --task "改造 fetcher.py 为 async 版本" &
claude --subagent --id sa-2 --task "改造 processor.py 为 async 版本" &
claude --subagent --id sa-3 --task "改造 writer.py 为 async 版本" &
wait # 等待所有并行子 agent 完成
# 阶段2: 合并与验证(依赖阶段1全部完成)
claude --merge-results --from sa-1,sa-2,sa-3
claude --run-tests --pattern "test_async.py"
这个脚本的关键特征:
- 动态生成:不是预定义模板,Claude 根据你的具体任务和代码结构实时生成。
- 依赖感知:如果模块之间有依赖(比如 B 依赖 A 的接口),Claude 会把 A 排在 B 前面,不会盲目全并行。
- 可审查:脚本在执行前会展示给你,你可以修改拓扑、增删子任务、调整并行度。
边界与风险:别把所有事都甩给 AI
动态工作流很强,但有几个现实约束需要正视:
冲突概率随并行度上升。 两个子 agent 同时改同一个文件的不同函数,合并时可能产生语义冲突——不是 git 冲突那么简单,而是逻辑层面的不一致。Bun 移植案例中,主 agent 花了大量时间做冲突解决和语义对齐。
上下文窗口是硬限制。 每个子 agent 有独立的上下文,它看不到其他子 agent 的实时进度。如果模块间有隐式依赖(比如共享的宏定义或类型别名),子 agent 可能做出不一致的假设。
测试覆盖决定质量上限。 动态工作流能加速执行,但不能替你写测试。如果原项目测试覆盖不足,子 agent 改出来的代码"看起来等价"但实际有微妙差异,合并后才会爆雷。
成本与并行度的权衡。 上百个子 agent 同时跑,API 调用成本不是小数目。建议先用小范围试跑,确认编排脚本合理后再拉满并行度。
实用建议:
| 场景 | 建议并行度 | 原因 |
|---|---|---|
| 模块间无依赖(如独立工具函数移植) | 高(20-100) | 全并行,合并成本低 |
| 模块间有接口依赖(如分层架构重构) | 中(5-10) | 按层分批,每层内并行 |
| 涉及共享状态或全局配置改动 | 低(1-3) | 冲突风险高,串行更稳 |
什么时候该用动态工作流
不是所有任务都适合。判断标准很简单:
适合的信号: - 任务可以拆成独立子块(文件级、模块级、函数级) - 每个子块有明确的输入和验收条件 - 合并规则清晰(文件级合并、接口对齐)
不适合的信号: - 改动高度耦合,一个函数的改动影响十个文件 - 验收条件模糊,需要人反复判断"这样改对不对" - 涉及安全敏感操作(生产数据库迁移、密钥轮换)
一个务实的起步方式:拿一个你有完整测试覆盖的项目,选一个可以模块级拆分的重构任务,先用 5-10 个子 agent 试跑。观察编排脚本是否合理、合并是否有冲突、最终测试是否全过。跑顺了再逐步加大并行度。
动态工作流把 AI 从"单兵作战"升级成了"指挥官+军团"的模式。但指挥官的水平取决于你给的目标有多清晰,军团的表现取决于测试覆盖有多扎实。工具提速了执行,但定义正确的问题和验收标准,仍然是人的活。