把持久化工作流搬进数据库:微软开源 pg_durable

2026-06-11 18 预计阅读时间: 1 分钟
来源: infoq.com AI 摘要 Original link

Disclaimer: This article is an AI-assisted summary. Read it together with the original source when precision matters. The summary may omit context, version differences, or edge cases and is not official documentation.

预计阅读时间:10 分钟

持久化执行(durable execution)一直是分布式系统的硬骨头——工作流跑到一半,进程崩溃、网络断开、节点宕机,你得从上次成功的那一步接着走,而不是从头重来。传统做法是把状态交给外部编排引擎(Temporal、AWS Step Functions 等),数据库只负责存业务数据。微软最近开源的 PostgreSQL 扩展 pg_durable 换了个思路:让工作流直接在数据库里跑,状态和业务数据同处一进程,编排层不再需要额外部署。

为什么"外部编排"让人头疼

典型架构里,一个订单工作流长这样:

应用服务 → 调用 Temporal/Step Functions → 写状态到外部存储 → 读/写业务数据到 PostgreSQL

每一步都有代价:

  • 双写一致性:编排引擎的状态和数据库的事务不在同一个事务里,中间崩溃就会出现"订单已创建但工作流标记为失败"这类不一致。
  • 运维负担:Temporal Server 本身就是一个分布式系统,需要独立集群、持久化存储、监控和升级。
  • 延迟叠加:每次状态变更都要跨网络调用编排引擎,再回到数据库,往返开销不可忽视。

pg_durable 的核心主张很简单:既然 PostgreSQL 本身就是一台久经考验的状态机(WAL、MVCC、崩溃恢复),为什么不直接用它来驱动工作流?

pg_durable 的工作模型

根据项目公开信息,pg_durable 把工作流的定义和执行状态都存入 PostgreSQL 表中,扩展内部负责:

  1. 步骤注册与调度——工作流的每个步骤(step)对应一个可调用的函数或 SQL 语句。
  2. 持久化 checkpoint——每完成一步,状态立刻写入 WAL,即使 PostgreSQL 进程崩溃,重启后也能从最近 checkpoint 恢复。
  3. 重试与补偿——失败步骤可配置重试策略,或触发补偿逻辑回滚已完成的副作用。

这意味着工作流的生命周期完全在数据库事务边界内管理,不再依赖外部协调者。

实践:用 pg_durable 定义一个订单处理工作流

以下示例展示如何在一个 PostgreSQL 数据库中安装 pg_durable 并定义一个包含三步的订单工作流。假设你已经有一个运行中的 PostgreSQL 16+ 实例。

安装扩展

-- 从源码编译安装(需要 pg_config 在 PATH 中)
-- git clone https://github.com/microsoft/pg_durable.git
-- cd pg_durable && make && make install

-- 在目标数据库中启用扩展
CREATE EXTENSION pg_durable;

定义工作流步骤

-- 注册一个名为 "order_process" 的工作流
SELECT pg_durable.register_workflow(
  'order_process',
  '处理电商订单的持久化工作流'
);

-- 添加步骤:验证库存
SELECT pg_durable.add_step(
  'order_process',
  'check_inventory',
  -- step_handler 是一个自定义函数名,pg_durable 会调用它
  'public.check_inventory_handler',
  -- 重试配置:最多 3 次,间隔 5 秒
  max_retries := 3,
  retry_interval := INTERVAL '5 seconds'
);

-- 添加步骤:扣减库存并创建订单记录
SELECT pg_durable.add_step(
  'order_process',
  'create_order',
  'public.create_order_handler',
  max_retries := 2,
  retry_interval := INTERVAL '3 seconds'
);

-- 添加步骤:发送通知
SELECT pg_durable.add_step(
  'order_process',
  'send_notification',
  'public.send_notification_handler',
  max_retries := 5,
  retry_interval := INTERVAL '10 seconds'
);

编写步骤处理函数

-- 步骤1:检查库存是否充足
CREATE OR REPLACE FUNCTION public.check_inventory_handler(
  workflow_id UUID,
  step_input JSONB
) RETURNS JSONB LANGUAGE plpgsql AS $$
DECLARE
  product_id INT;
  qty INT;
  available INT;
BEGIN
  product_id := (step_input ->> 'product_id')::INT;
  qty := (step_input ->> 'quantity')::INT;

  SELECT stock INTO available
    FROM inventory
    WHERE product_id = product_id;

  IF available < qty THEN
    -- 返回失败结果,pg_durable 会按重试策略处理
    RETURN jsonb_build_object(
      'status', 'failed',
      'reason', 'insufficient_stock',
      'available', available
    );
  END IF;

  RETURN jsonb_build_object(
    'status', 'completed',
    'product_id', product_id,
    'quantity', qty
  );
END;
$$;

-- 步骤2:创建订单(在同一个数据库事务中完成)
CREATE OR REPLACE FUNCTION public.create_order_handler(
  workflow_id UUID,
  step_input JSONB
) RETURNS JSONB LANGUAGE plpgsql AS $$
DECLARE
  product_id INT;
  qty INT;
  order_id INT;
BEGIN
  product_id := (step_input ->> 'product_id')::INT;
  qty := (step_input ->> 'quantity')::INT;

  -- 扣减库存
  UPDATE inventory SET stock = stock - qty
    WHERE product_id = product_id AND stock >= qty;

  -- 创建订单记录
  INSERT INTO orders (product_id, quantity, status)
    VALUES (product_id, qty, 'created')
    RETURNING id INTO order_id;

  RETURN jsonb_build_object(
    'status', 'completed',
    'order_id', order_id
  );
END;
$$;

-- 步骤3:发送通知(可能调用外部服务)
CREATE OR REPLACE FUNCTION public.send_notification_handler(
  workflow_id UUID,
  step_input JSONB
) RETURNS JSONB LANGUAGE plpgsql AS $$
DECLARE
  order_id INT;
  notify_result JSONB;
BEGIN
  order_id := (step_input ->> 'order_id')::INT;

  -- 这里可以调用外部 HTTP 服务(通过 pg_net 或类似扩展)
  -- 以下为示意,实际需配合异步通知扩展
  notify_result := jsonb_build_object(
    'status', 'completed',
    'order_id', order_id,
    'channel', 'email'
  );

  RETURN notify_result;
END;
$$;

启动工作流实例

-- 发起一个订单处理工作流
SELECT pg_durable.start_workflow(
  'order_process',
  jsonb_build_object(
    'product_id', 42,
    'quantity', 3
  )
);

启动后,pg_durable 会按步骤顺序依次调用处理函数,每步完成后 checkpoint 写入 WAL。如果 PostgreSQL 在第二步中途崩溃,重启后工作流会从 create_order 步骤重新执行——而不是从 check_inventory 重来。

查看工作流状态

-- 查询当前运行中的工作流
SELECT workflow_id, workflow_name, current_step, status, created_at
  FROM pg_durable.workflows
  WHERE status = 'running';

-- 查询某个工作流的步骤执行历史
SELECT step_name, status, attempt_count, started_at, completed_at
  FROM pg_durable.step_history
  WHERE workflow_id = '0192f8c4-7a3e-7d00-8000-000000000001'
  ORDER BY started_at;

注意:以上函数签名和表名基于 pg_durable 公开文档的典型模式。具体 API 可能随版本迭代有差异,部署前请对照项目仓库中的 README 和 SQL 定义文件确认。

同库编排 vs 外部编排:取舍在哪

把编排引擎塞进数据库不是万能解,它有明确的优势和边界:

维度 pg_durable(同库编排) Temporal 等外部引擎
一致性 工作流状态与业务数据在同一事务,天然一致 需要额外机制保证双写一致
运维复杂度 无额外集群,随 PostgreSQL 一起管理 独立集群,独立升级和监控
延迟 步骤间调度在进程内,微秒级 跨网络调用,毫秒到秒级
可观测性 依赖 PostgreSQL 日志和查询 专用 UI、指标体系成熟
跨服务编排 适合单库内的工作流 天生支持跨服务、跨团队
语言绑定 SQL / plpgsql,对数据库团队友好 Go/Java/Python SDK,对应用团队友好
扩展性 受单节点 PostgreSQL 上限约束 可水平扩展至大规模工作流

适合 pg_durable 的场景:工作流步骤主要是数据库操作(库存扣减、状态流转、数据清洗),参与方都在同一个 PostgreSQL 内。

不适合的场景:工作流需要协调多个独立微服务、等待人工审批数天、或者工作流吞吐量远超单库承载能力——这时外部编排引擎仍然是更务实的选择。

上手清单

如果你打算试用 pg_durable,建议按这个顺序推进:

  1. 确认 PostgreSQL 版本——pg_durable 目前要求 PG 16+,检查你的实例版本。
  2. 从 GitHub 获取源码并编译——make && make install,确保 pg_config 指向目标实例。
  3. 在测试库中启用扩展——CREATE EXTENSION pg_durable;,先不要在生产库操作。
  4. 用一个两步工作流验证崩溃恢复——手动 pg_ctl stop -m immediate 模拟硬崩溃,重启后确认工作流从断点续跑。
  5. 评估步骤函数的外部调用需求——如果某步需要调 HTTP API,考虑搭配 pg_net 等异步 HTTP 扩展,避免在 plpgsql 中阻塞。
  6. 监控 WAL 增长——每步 checkpoint 都写 WAL,高频工作流下关注磁盘空间和归档策略。

pg_durable 把一个本来要额外部署一整套分布式系统的能力,压缩到了一个 PostgreSQL 扩展里。对于工作流逻辑集中在数据库内部的团队来说,这可能是最省力的持久化执行方案——但前提是你清楚它的边界,并在超出单库能力时及时切换到外部编排。


相关推荐