持久化执行(durable execution)一直是分布式系统的硬骨头——工作流跑到一半,进程崩溃、网络断开、节点宕机,你得从上次成功的那一步接着走,而不是从头重来。传统做法是把状态交给外部编排引擎(Temporal、AWS Step Functions 等),数据库只负责存业务数据。微软最近开源的 PostgreSQL 扩展 pg_durable 换了个思路:让工作流直接在数据库里跑,状态和业务数据同处一进程,编排层不再需要额外部署。
为什么"外部编排"让人头疼
典型架构里,一个订单工作流长这样:
应用服务 → 调用 Temporal/Step Functions → 写状态到外部存储 → 读/写业务数据到 PostgreSQL
每一步都有代价:
- 双写一致性:编排引擎的状态和数据库的事务不在同一个事务里,中间崩溃就会出现"订单已创建但工作流标记为失败"这类不一致。
- 运维负担:Temporal Server 本身就是一个分布式系统,需要独立集群、持久化存储、监控和升级。
- 延迟叠加:每次状态变更都要跨网络调用编排引擎,再回到数据库,往返开销不可忽视。
pg_durable 的核心主张很简单:既然 PostgreSQL 本身就是一台久经考验的状态机(WAL、MVCC、崩溃恢复),为什么不直接用它来驱动工作流?
pg_durable 的工作模型
根据项目公开信息,pg_durable 把工作流的定义和执行状态都存入 PostgreSQL 表中,扩展内部负责:
- 步骤注册与调度——工作流的每个步骤(step)对应一个可调用的函数或 SQL 语句。
- 持久化 checkpoint——每完成一步,状态立刻写入 WAL,即使 PostgreSQL 进程崩溃,重启后也能从最近 checkpoint 恢复。
- 重试与补偿——失败步骤可配置重试策略,或触发补偿逻辑回滚已完成的副作用。
这意味着工作流的生命周期完全在数据库事务边界内管理,不再依赖外部协调者。
实践:用 pg_durable 定义一个订单处理工作流
以下示例展示如何在一个 PostgreSQL 数据库中安装 pg_durable 并定义一个包含三步的订单工作流。假设你已经有一个运行中的 PostgreSQL 16+ 实例。
安装扩展
-- 从源码编译安装(需要 pg_config 在 PATH 中)
-- git clone https://github.com/microsoft/pg_durable.git
-- cd pg_durable && make && make install
-- 在目标数据库中启用扩展
CREATE EXTENSION pg_durable;
定义工作流步骤
-- 注册一个名为 "order_process" 的工作流
SELECT pg_durable.register_workflow(
'order_process',
'处理电商订单的持久化工作流'
);
-- 添加步骤:验证库存
SELECT pg_durable.add_step(
'order_process',
'check_inventory',
-- step_handler 是一个自定义函数名,pg_durable 会调用它
'public.check_inventory_handler',
-- 重试配置:最多 3 次,间隔 5 秒
max_retries := 3,
retry_interval := INTERVAL '5 seconds'
);
-- 添加步骤:扣减库存并创建订单记录
SELECT pg_durable.add_step(
'order_process',
'create_order',
'public.create_order_handler',
max_retries := 2,
retry_interval := INTERVAL '3 seconds'
);
-- 添加步骤:发送通知
SELECT pg_durable.add_step(
'order_process',
'send_notification',
'public.send_notification_handler',
max_retries := 5,
retry_interval := INTERVAL '10 seconds'
);
编写步骤处理函数
-- 步骤1:检查库存是否充足
CREATE OR REPLACE FUNCTION public.check_inventory_handler(
workflow_id UUID,
step_input JSONB
) RETURNS JSONB LANGUAGE plpgsql AS $$
DECLARE
product_id INT;
qty INT;
available INT;
BEGIN
product_id := (step_input ->> 'product_id')::INT;
qty := (step_input ->> 'quantity')::INT;
SELECT stock INTO available
FROM inventory
WHERE product_id = product_id;
IF available < qty THEN
-- 返回失败结果,pg_durable 会按重试策略处理
RETURN jsonb_build_object(
'status', 'failed',
'reason', 'insufficient_stock',
'available', available
);
END IF;
RETURN jsonb_build_object(
'status', 'completed',
'product_id', product_id,
'quantity', qty
);
END;
$$;
-- 步骤2:创建订单(在同一个数据库事务中完成)
CREATE OR REPLACE FUNCTION public.create_order_handler(
workflow_id UUID,
step_input JSONB
) RETURNS JSONB LANGUAGE plpgsql AS $$
DECLARE
product_id INT;
qty INT;
order_id INT;
BEGIN
product_id := (step_input ->> 'product_id')::INT;
qty := (step_input ->> 'quantity')::INT;
-- 扣减库存
UPDATE inventory SET stock = stock - qty
WHERE product_id = product_id AND stock >= qty;
-- 创建订单记录
INSERT INTO orders (product_id, quantity, status)
VALUES (product_id, qty, 'created')
RETURNING id INTO order_id;
RETURN jsonb_build_object(
'status', 'completed',
'order_id', order_id
);
END;
$$;
-- 步骤3:发送通知(可能调用外部服务)
CREATE OR REPLACE FUNCTION public.send_notification_handler(
workflow_id UUID,
step_input JSONB
) RETURNS JSONB LANGUAGE plpgsql AS $$
DECLARE
order_id INT;
notify_result JSONB;
BEGIN
order_id := (step_input ->> 'order_id')::INT;
-- 这里可以调用外部 HTTP 服务(通过 pg_net 或类似扩展)
-- 以下为示意,实际需配合异步通知扩展
notify_result := jsonb_build_object(
'status', 'completed',
'order_id', order_id,
'channel', 'email'
);
RETURN notify_result;
END;
$$;
启动工作流实例
-- 发起一个订单处理工作流
SELECT pg_durable.start_workflow(
'order_process',
jsonb_build_object(
'product_id', 42,
'quantity', 3
)
);
启动后,pg_durable 会按步骤顺序依次调用处理函数,每步完成后 checkpoint 写入 WAL。如果 PostgreSQL 在第二步中途崩溃,重启后工作流会从 create_order 步骤重新执行——而不是从 check_inventory 重来。
查看工作流状态
-- 查询当前运行中的工作流
SELECT workflow_id, workflow_name, current_step, status, created_at
FROM pg_durable.workflows
WHERE status = 'running';
-- 查询某个工作流的步骤执行历史
SELECT step_name, status, attempt_count, started_at, completed_at
FROM pg_durable.step_history
WHERE workflow_id = '0192f8c4-7a3e-7d00-8000-000000000001'
ORDER BY started_at;
注意:以上函数签名和表名基于 pg_durable 公开文档的典型模式。具体 API 可能随版本迭代有差异,部署前请对照项目仓库中的 README 和 SQL 定义文件确认。
同库编排 vs 外部编排:取舍在哪
把编排引擎塞进数据库不是万能解,它有明确的优势和边界:
| 维度 | pg_durable(同库编排) | Temporal 等外部引擎 |
|---|---|---|
| 一致性 | 工作流状态与业务数据在同一事务,天然一致 | 需要额外机制保证双写一致 |
| 运维复杂度 | 无额外集群,随 PostgreSQL 一起管理 | 独立集群,独立升级和监控 |
| 延迟 | 步骤间调度在进程内,微秒级 | 跨网络调用,毫秒到秒级 |
| 可观测性 | 依赖 PostgreSQL 日志和查询 | 专用 UI、指标体系成熟 |
| 跨服务编排 | 适合单库内的工作流 | 天生支持跨服务、跨团队 |
| 语言绑定 | SQL / plpgsql,对数据库团队友好 | Go/Java/Python SDK,对应用团队友好 |
| 扩展性 | 受单节点 PostgreSQL 上限约束 | 可水平扩展至大规模工作流 |
适合 pg_durable 的场景:工作流步骤主要是数据库操作(库存扣减、状态流转、数据清洗),参与方都在同一个 PostgreSQL 内。
不适合的场景:工作流需要协调多个独立微服务、等待人工审批数天、或者工作流吞吐量远超单库承载能力——这时外部编排引擎仍然是更务实的选择。
上手清单
如果你打算试用 pg_durable,建议按这个顺序推进:
- 确认 PostgreSQL 版本——pg_durable 目前要求 PG 16+,检查你的实例版本。
- 从 GitHub 获取源码并编译——
make && make install,确保pg_config指向目标实例。 - 在测试库中启用扩展——
CREATE EXTENSION pg_durable;,先不要在生产库操作。 - 用一个两步工作流验证崩溃恢复——手动
pg_ctl stop -m immediate模拟硬崩溃,重启后确认工作流从断点续跑。 - 评估步骤函数的外部调用需求——如果某步需要调 HTTP API,考虑搭配
pg_net等异步 HTTP 扩展,避免在 plpgsql 中阻塞。 - 监控 WAL 增长——每步 checkpoint 都写 WAL,高频工作流下关注磁盘空间和归档策略。
pg_durable 把一个本来要额外部署一整套分布式系统的能力,压缩到了一个 PostgreSQL 扩展里。对于工作流逻辑集中在数据库内部的团队来说,这可能是最省力的持久化执行方案——但前提是你清楚它的边界,并在超出单库能力时及时切换到外部编排。