Uber 如何用 250ms 批量窗口让账本系统扛住每秒 30+ 次热账户更新

2026-06-04 27 预计阅读时间:1 分钟
来源:infoq.com AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:14 分钟

金融账本系统有一个经典难题:某些"热账户"会被大量交易同时写入。一个高频乘客的账户在一秒内可能产生多笔扣款、退款、奖励入账,如果每笔交易都争抢同一行记录的锁,吞吐量很快就会塌方。Uber 在分布式账本基础设施中碰到了这个问题——原来多小时的批处理管道,他们用 250ms 批量窗口 + Redis 协调 + 乐观原子更新这套组合拳,压到了分钟级完成,单账户每秒稳定处理 30+ 次更新,同时保证一致性和可审计性。

热账户写入:为什么传统方案扛不住

账本的核心约束是:同一账户的余额变更必须串行化,否则就会出现余额计算错乱。传统做法有两种路线——

  • 行锁/悲观锁:每笔更新先锁住账户行,改完释放。热账户下锁争用剧烈,吞吐量随并发数指数级下降。
  • 队列串行化:把同一账户的所有更新排进一条队列,单消费者顺序处理。队列长度在高峰期暴涨,延迟从毫秒膨胀到秒甚至分钟。

Uber 的场景更苛刻:全球多区域部署,同一账户的更新可能从不同数据中心发起,本地队列无法自然串行化跨区域的写入。

250ms 批量窗口:把 N 次写入压成 1 次原子提交

Uber 的核心思路是:不逐笔更新余额,而是把一个短时间窗口内的所有变更攒成一批,一次性原子提交

窗口长度选了 250ms——这个数字不是拍脑袋的:

  • 太短(比如 10ms),攒不到足够多的更新,批量的收益小,反而增加了调度开销。
  • 太长(比如 1s+),用户感知到余额刷新延迟,体验下降。
  • 250ms 在 Uber 的负载下,平均能攒到 7-8 笔更新,批量合并的吞吐收益显著,而用户几乎感知不到延迟。

批量窗口的工作流程:

  1. 一笔交易到来时,不直接改账户余额,而是把变更金额追加到 Redis 中该账户的"待提交缓冲区"。
  2. 250ms 窗口到期时,负责该账户的处理器把缓冲区里所有变更合并成一个净差额(net delta),一次性写入持久化账本。
  3. 合并写入使用乐观原子更新:读取当前余额,计算新余额,用 CAS(compare-and-set)语义写入;如果期间余额被其他批次改了,重试整个批次。

这样,原来 30 次独立写入变成 3-4 次批量提交,锁争用降到原来的 1/8,吞吐量反而上升。

Redis 协调:跨区域的缓冲区聚合

批量窗口要生效,同一账户的所有更新必须落到同一个缓冲区。Uber 用 Redis 来做这个协调:

  • 每个账户的缓冲区由一个 Redis key 标识,key 里包含账户 ID 和当前窗口的 epoch 编号。
  • 写入端通过一致性哈希确定该账户对应的 Redis 分片,把变更金额 INCR 到缓冲区 key 上。Redis 的 INCR 本身是原子操作,并发写入不会丢失。
  • 窗口到期时,负责该分片的处理器 GETDEL 缓冲区 key,拿到整个窗口的净差额,进入持久化写入阶段。

Redis 在这里的角色是临时聚合层,不是持久存储。缓冲区数据生命周期只有 250ms,即使 Redis 宕机,也只是丢失一个窗口的聚合,原始交易记录还在上游系统里,可以重新触发。

乐观原子更新:批量提交的一致性保障

攒完差额后,写入持久化账本时仍要保证余额正确。Uber 用乐观并发控制替代悲观锁:

1. 读取账户当前余额 balance_old带版本号 v
2. 计算新余额 balance_new = balance_old + net_delta
3. 写入UPDATE account SET balance = balance_new, version = v+1
        WHERE account_id = X AND version = v
4. 如果 WHERE 条件不匹配版本已被改),重试步骤 1-3

因为批量已经把多次写入压缩成少数几次提交,CAS 冲突的概率大幅降低。即使偶尔冲突,重试一次就能成功——热账户的并发提交频率从 30/s 降到 3-4/s,两个批次撞在同一版本的几率很小。

实践:用 Python + Redis 模拟批量账本窗口

下面是一个可运行的简化实现,演示 250ms 批量窗口 + Redis 缓冲区 + 乐观原子更新的核心逻辑。你需要本地运行 Redis(redis-server),然后安装依赖:

pip install redis
redis-server  # 默认 6379 端口
import redis
import threading
import time
import uuid
from dataclasses import dataclass

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

BATCH_WINDOW_MS = 250
ACCOUNT_ID = "user_42"

# ── 1. 写入端:把变更追加到 Redis 缓冲区 ──
def submit_delta(account_id: str, delta: float, txn_id: str):
    """模拟一笔交易到来,把金额变更 INCR 到 Redis 缓冲区"""
    epoch = int(time.time() * 1000 // BATCH_WINDOW_MS)
    buffer_key = f"ledger:buffer:{account_id}:{epoch}"
    # 用 INCRBYFLOAT 原子追加差额(正数=入账,负数=扣款)
    r.incrbyfloat(buffer_key, delta)
    # 记录这笔交易属于哪个 epoch,用于审计追溯
    r.sadd(f"ledger:txns:{account_id}:{epoch}", txn_id)
    # 设置缓冲区 key 的 TTL,防止孤儿数据
    r.pexpire(buffer_key, BATCH_WINDOW_MS * 3)
    r.pexpire(f"ledger:txns:{account_id}:{epoch}", BATCH_WINDOW_MS * 3)
    print(f"  → txn {txn_id}: delta={delta}, epoch={epoch}")


# ── 2. 批处理器:每 250ms 收割缓冲区,乐观原子写入 ──
def batch_processor(account_id: str, stop_event: threading.Event):
    """每 250ms 收割当前 epoch 的缓冲区,合并写入持久层"""
    while not stop_event.is_set():
        time.sleep(BATCH_WINDOW_MS / 1000.0)
        epoch = int(time.time() * 1000 // BATCH_WINDOW_MS)
        buffer_key = f"ledger:buffer:{account_id}:{epoch}"
        txn_set_key = f"ledger:txns:{account_id}:{epoch}"

        net_delta = r.get(buffer_key)
        if net_delta is None:
            continue  # 本窗口无变更,跳过

        net_delta = float(net_delta)
        txn_ids = r.smembers(txn_set_key)

        # 乐观原子更新(模拟 CAS)
        max_retries = 5
        for attempt in range(max_retries):
            # 读取当前余额和版本
            current = r.hgetall(f"ledger:account:{account_id}")
            if not current:
                current = {"balance": "0.0", "version": "0"}
            old_balance = float(current["balance"])
            old_version = int(current["version"])

            new_balance = old_balance + net_delta
            new_version = old_version + 1

            # CAS 写入:用 Lua 脚本保证原子性
            lua_cas = """
            local key = KEYS[1]
            local expected_version = tonumber(ARGV[1])
            local new_balance = ARGV[2]
            local new_version = tonumber(ARGV[3])

            local current_version = tonumber(redis.call('HGET', key, 'version') or '0')
            if current_version == expected_version then
                redis.call('HSET', key, 'balance', new_balance, 'version', new_version)
                return 1
            else
                return 0
            end
            """
            success = r.eval(lua_cas, 1,
                             f"ledger:account:{account_id}",
                             old_version, str(new_balance), new_version)

            if success:
                # 写入成功,清理缓冲区
                r.delete(buffer_key, txn_set_key)
                # 写入审计日志
                audit_entry = f"epoch={epoch}|delta={net_delta}|txns={','.join(txn_ids)}|new_balance={new_balance}"
                r.rpush(f"ledger:audit:{account_id}", audit_entry)
                print(f"  ✓ BATCH COMMIT: epoch={epoch}, net_delta={net_delta}, "
                      f"txns={len(txn_ids)}, balance={old_balance}{new_balance}")
                break
            else:
                print(f"  ⚠ CAS conflict (attempt {attempt+1}), retrying...")
                time.sleep(0.01)  # 短暂等待后重试


# ── 3. 模拟并发写入 ──
def simulate_concurrent_writes(account_id: str, num_txns: int):
    """模拟多线程并发提交交易"""
    threads = []
    for i in range(num_txns):
        delta = 10.0 if i % 3 == 0 else -5.0  # 模拟混合入账/扣款
        txn_id = str(uuid.uuid4())[:8]
        t = threading.Thread(target=submit_delta,
                             args=(account_id, delta, txn_id))
        threads.append(t)
        t.start()
        time.sleep(0.03)  # 约 30ms 一笔,模拟 30+ updates/s
    for t in threads:
        t.join()


# ── 4. 运行演示 ──
if __name__ == "__main__":
    # 清理测试数据
    for key in r.keys(f"ledger:*:{ACCOUNT_ID}*"):
        r.delete(key)

    # 启动批处理器
    stop = threading.Event()
    processor = threading.Thread(target=batch_processor,
                                 args=(ACCOUNT_ID, stop))
    processor.start()

    # 模拟 1 秒内约 30 笔并发写入
    print("=== Simulating ~30 concurrent updates in 1s ===")
    simulate_concurrent_writes(ACCOUNT_ID, 30)

    # 等待最后一个窗口处理完
    time.sleep(0.5)
    stop.set()
    processor.join()

    # 打印最终状态
    final = r.hgetall(f"ledger:account:{ACCOUNT_ID}")
    audit_log = r.lrange(f"ledger:audit:{ACCOUNT_ID}", 0, -1)
    print(f"\n=== Final State ===")
    print(f"Balance: {final.get('balance', '0.0')}, Version: {final.get('version', '0')}")
    print(f"Audit entries: {len(audit_log)}")
    for entry in audit_log:
        print(f"  {entry}")

运行后你会看到:30 笔独立写入被压缩成 3-4 次批量提交,每次提交包含多笔交易的净差额,审计日志完整记录了每批包含的交易 ID。这就是 Uber 方案的核心骨架。

注意:这是演示版,生产环境还需要补充——缓冲区收割的容错(处理器崩溃时窗口数据不能丢)、Redis 集群分片路由、跨区域 epoch 同步对齐、持久层从 Redis Hash 替换为真实数据库等。

从小时到分钟:批量窗口的系统性收益

Uber 报告的效果不仅是单账户吞吐提升。原来的账本处理是小时级批处理管道——交易数据先落盘到 Hive,再由 Spark 作业批量计算余额变更,整个链路延迟以小时计。

换成 250ms 实时批量窗口后:

  • 延迟:从小时级降到秒级(250ms 窗口 + 1-2 次 CAS 重试)。
  • 吞吐:单账户 30+ updates/s,全局可水平扩展(不同账户路由到不同 Redis 分片和处理器)。
  • 一致性:乐观 CAS + 审计日志保证余额正确且可追溯。
  • 容错:Redis 只做临时聚合,持久化失败时可以从原始交易重放。

采用建议与边界

这套方案不是万能药,适用前想清楚几个问题:

考量 判断
账户更新频率 如果大部分账户 < 5 updates/s,逐笔写入就够了,批量窗口的复杂度不值得
延迟容忍度 250ms 窗口意味着余额刷新有 250ms 延迟,金融交易对余额实时性要求极高时需要评估
审计需求 批量合并后单笔交易不直接对应一次余额变更,审计链需要额外设计(如上面的 epoch→txn_ids 映射)
Redis 依赖 Redis 成为实时路径的关键依赖,需要集群高可用保障;但持久层失败不影响数据完整性
跨区域 epoch 多区域部署时,同一账户的更新可能落入不同 epoch,需要全局时钟或逻辑 epoch 对齐机制

如果你的系统确实有热账户写入瓶颈,可以按这个路径逐步引入:

  1. 先加监控:统计 Top-N 账户的写入 QPS,确认瓶颈真实存在。
  2. 引入缓冲区层:用 Redis INCR 做变更聚合,先不改持久层,观察缓冲区效果。
  3. 加批量收割器:按固定窗口收割缓冲区,乐观写入持久层。
  4. 补审计和容错:epoch→交易映射、收割器 crash recovery、Redis HA。

不要一步到位——先让缓冲区层跑起来,验证聚合比例符合预期,再逐步替换持久写入路径。


相关推荐