扇出架构的慢请求不是故障:自适应 Hedged Request 如何把 p99 延迟砍掉 74%

2026-05-28 20 预计阅读时间:1 分钟
来源:infoq.com AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:13 分钟

你监控每个服务的 p99 都在 200ms 以内,但网关层的 p99 却飙到 800ms——这不是监控出了问题,而是扇出架构里"慢但没挂"的请求在层层叠加。Prathamesh Bhope 在这篇文章里提出了一套自适应对冲请求(adaptive hedged request)方案:用 DDSketch 实时估算分位数决定何时发对冲请求,用滑动窗口应对分布漂移,用令牌桶限制对冲请求总量,最终把 p99 延迟压低了 74%。

扇出架构的延迟叠加效应

扇出(fan-out)是微服务里最常见的拓扑:一个请求到达网关后,并行调用 N 个下游服务,等最慢的那个返回才算完成。假设每个服务 99% 的请求在 200ms 内完成,但各有 1% 的请求要 600ms。如果扇出因子是 5:

  • 单服务 p99 = 600ms
  • 网关层 p99 ≈ 1 − (0.99)^5 ≈ 4.9% 的请求会撞上至少一个慢下游,p99 跳到 600ms 甚至更高

这就是"straggler"问题——请求没有失败,只是慢了。传统超时策略要么太激进(误杀正常慢请求),要么太保守(p99 没改善)。重试也不合适:慢请求最终会完成,重试只是多了一份负载。

Hedged Request:发一个对冲请求,取最快的

Google 2015 年的论文《The Tail at Scale》提出了 hedged request:先发一个正常请求,如果在"合理时间"内没收到响应,再发一个对冲请求,两个都等,取最先返回的,取消另一个。

核心问题变成:"合理时间"怎么定?

  • 固定阈值(比如 200ms)在负载变化时会失效:低负载时 200ms 太晚,高负载时 200ms 太早,大量对冲请求反而加剧延迟
  • 需要一个能跟随实时分布动态调整的阈值

三组件自适应对冲机制

Bhope 的方案由三个组件协同工作:

1. DDSketch 实时分位数估算

DDSketch 是 Datadog 提出的分位数草图算法,核心特点:

  • 可合并(mergeable):多个实例的 sketch 可以合并,适合分布式场景
  • 相对误差保证:设定 α 后,任何分位数的相对误差不超过 α
  • 内存可控:用 collapsing 机制压缩低频桶

用它来实时追踪下游服务的 p95 或 p99,作为对冲阈值的基准。比如设定阈值 = p95 × 1.5,意味着"如果一个请求已经比 95% 的请求慢了 50%,就发对冲"。

2. 滑动窗口应对分布漂移

服务延迟分布不是静态的:流量高峰、部署变更、缓存冷启动都会让分布漂移。如果 sketch 无限累积,旧数据会拖累阈值判断。

方案是窗口化旋转:维护两个 sketch——当前窗口和上一个窗口。当前窗口接收新数据,上一个窗口提供历史参考。窗口到期时,当前窗口变为上一个窗口,新的当前窗口从零开始。阈值计算基于两个窗口的加权合并:

effective_p95 = 0.7 × current_p95 + 0.3 × previous_p95

这保证了阈值既跟得上短期变化,又不被瞬时抖动带飞。

3. 令牌桶限制对冲放大

对冲请求本质上是多发一份流量。如果下游已经过载,大量对冲请求会让情况更糟。令牌桶(token bucket)是经典的限流手段:

  • 桶容量 C:允许短时间内最多 C 个对冲请求
  • 填充速率 r:每秒恢复 r 个令牌

只有拿到令牌的对冲请求才会发出,拿不到就等原始请求完成。这把对冲请求的额外负载限制在可控范围内。

实战:用 Python 实现一个自适应 Hedged Client

下面是一个可运行的简化实现,包含 DDSketch 阈值计算、窗口旋转和令牌桶限流。依赖 ddsketch 库(pip install ddsketch)。

"""
Adaptive Hedged Request Client — 简化演示版
pip install ddsketch
"""

import asyncio
import time
import random
from ddsketch import DDSketch

# ---------- 配置 ----------
HEDGE_ALPHA = 0.01          # DDSketch 相对误差
HEDGE_QUANTILE = 0.95       # 用 p95 作为基准
HEDGE_MULTIPLIER = 1.5      # 阈值 = p95 × 1.5
WINDOW_SECONDS = 30         # 窗口旋转周期
WINDOW_WEIGHT_CURRENT = 0.7 # 当前窗口权重
TOKEN_BUCKET_CAPACITY = 20  # 令牌桶容量
TOKEN_BUCKET_REFILL_RATE = 5 # 每秒恢复令牌数
MAX_CONCURRENT_HEDGES = 3   # 同时最多几个对冲请求


class AdaptiveHedgedClient:
    def __init__(self):
        # 两个 sketch 窗口
        self.current_sketch = DDSketch(HEDGE_ALPHA)
        self.previous_sketch = DDSketch(HEDGE_ALPHA)
        self.window_start = time.monotonic()

        # 令牌桶
        self.tokens = TOKEN_BUCKET_CAPACITY
        self.last_refill = time.monotonic()

        self._lock = asyncio.Lock()

    def _refill_tokens(self):
        now = time.monotonic()
        elapsed = now - self.last_refill
        self.tokens = min(
            TOKEN_BUCKET_CAPACITY,
            self.tokens + elapsed * TOKEN_BUCKET_REFILL_RATE,
        )
        self.last_refill = now

    async def _try_acquire_token(self) -> bool:
        async with self._lock:
            self._refill_tokens()
            if self.tokens >= 1:
                self.tokens -= 1
                return True
            return False

    def _rotate_window_if_needed(self):
        now = time.monotonic()
        if now - self.window_start >= WINDOW_SECONDS:
            self.previous_sketch = self.current_sketch
            self.current_sketch = DDSketch(HEDGE_ALPHA)
            self.window_start = now

    def _get_hedge_threshold(self) -> float | None:
        """计算对冲阈值,窗口数据不足时返回 None(不发对冲)"""
        self._rotate_window_if_needed()

        # 当前窗口必须有足够样本
        if self.current_sketch.count < 20:
            # 样本不足,只用上一个窗口(如果有的话)
            if self.previous_sketch.count >= 20:
                return self.previous_sketch.get_quantile_value(HEDGE_QUANTILE) * HEDGE_MULTIPLIER
            return None

        current_p95 = self.current_sketch.get_quantile_value(HEDGE_QUANTILE)

        if self.previous_sketch.count >= 20:
            previous_p95 = self.previous_sketch.get_quantile_value(HEDGE_QUANTILE)
            effective_p95 = (
                WINDOW_WEIGHT_CURRENT * current_p95
                + (1 - WINDOW_WEIGHT_CURRENT) * previous_p95
            )
        else:
            effective_p95 = current_p95

        return effective_p95 * HEDGE_MULTIPLIER

    def _record_latency(self, latency_ms: float):
        self._rotate_window_if_needed()
        self.current_sketch.add(latency_ms)

    async def call_with_hedge(self, downstream_call, label: str = "") -> tuple:
        """
        downstream_call: async callable,返回 (result, latency_ms)
        返回: (result, was_hedged: bool, original_latency_ms, hedge_latency_ms|None)
        """
        threshold = self._get_hedge_threshold()

        # 发原始请求
        start = time.monotonic()
        orig_task = asyncio.create_task(downstream_call())

        if threshold is None:
            # 没有足够数据,不发对冲
            result, lat = await orig_task
            self._record_latency(lat)
            return (result, False, lat, None)

        # 等到阈值时间,看原始请求是否已返回
        hedge_task = None
        try:
            remaining = threshold / 1000.0 - (time.monotonic() - start)
            if remaining > 0:
                done, _ = await asyncio.wait({orig_task}, timeout=remaining)
                if done:
                    result, lat = orig_task.result()
                    self._record_latency(lat)
                    return (result, False, lat, None)
        except asyncio.CancelledError:
            pass

        # 原始请求还没返回 → 尝试发对冲
        can_hedge = await self._try_acquire_token()
        if can_hedge:
            hedge_task = asyncio.create_task(downstream_call())
            # 等任意一个完成
            done, pending = await asyncio.wait(
                {orig_task, hedge_task}, return_when=asyncio.FIRST_COMPLETED
            )
            # 取最先完成的结果
            winner = done.pop()
            result, lat = winner.result()
            self._record_latency(lat)

            # 取另一个的延迟(不等结果,只记录取消情况)
            loser = pending.pop()
            loser.cancel()
            try:
                await loser
            except (asyncio.CancelledError, Exception):
                pass

            orig_lat = lat if winner == orig_task else None
            hedge_lat = lat if winner == hedge_task else None
            return (result, True, orig_lat, hedge_lat)
        else:
            # 没令牌,只能等原始请求
            result, lat = await orig_task
            self._record_latency(lat)
            return (result, False, lat, None)


# ---------- 模拟下游服务 ----------
async def mock_downstream():
    """模拟下游:90% 在 100ms 内,5% 在 300ms,5% 在 800ms(straggler)"""
    r = random.random()
    if r < 0.90:
        lat = random.uniform(80, 120)
    elif r < 0.95:
        lat = random.uniform(280, 320)
    else:
        lat = random.uniform(750, 850)
    await asyncio.sleep(lat / 1000.0)
    return ("ok", lat)


# ---------- 运行演示 ----------
async def main():
    client = AdaptiveHedgedClient()
    results = []

    # 发 200 个请求,观察 p99 变化
    for i in range(200):
        result, was_hedged, orig_lat, hedge_lat = await client.call_with_hedge(
            mock_downstream, label=f"req-{i}"
        )
        effective_lat = orig_lat or hedge_lat
        results.append(effective_lat)
        if i % 50 == 0:
            sorted_lats = sorted(results)
            p50 = sorted_lats[int(len(sorted_lats) * 0.50)]
            p95 = sorted_lats[int(len(sorted_lats) * 0.95)]
            p99 = sorted_lats[int(len(sorted_lats) * 0.99)]
            print(f"[{i:>3} reqs] p50={p50:.0f}ms  p95={p95:.0f}ms  p99={p99:.0f}ms  hedged={was_hedged}")

    sorted_lats = sorted(results)
    p50 = sorted_lats[int(len(sorted_lats) * 0.50)]
    p95 = sorted_lats[int(len(sorted_lats) * 0.95)]
    p99 = sorted_lats[int(len(sorted_lats) * 0.99)]
    print(f"\n=== 最终结果 ===")
    print(f"p50={p50:.0f}ms  p95={p95:.0f}ms  p99={p99:.0f}ms")
    print(f"令牌桶剩余: {client.tokens:.1f}")


if __name__ == "__main__":
    asyncio.run(main())

运行方式:

pip install ddsketch
python adaptive_hedged_client.py

关键参数调整说明:

参数 作用 调参方向
HEDGE_QUANTILE 基准分位数 0.90 更激进(更多对冲),0.99 更保守
HEDGE_MULTIPLIER 阈值放大系数 1.2 更激进,2.0 更保守
WINDOW_SECONDS 窗口周期 短周期跟得上漂移,但样本少;长周期更稳定
TOKEN_BUCKET_REFILL_RATE 对冲流量上限 设为下游 QPS 的 5-10% 通常安全

令牌桶容量怎么定

令牌桶容量决定了短时间窗口内允许的对冲请求突发量。一个实用的估算方法:

容量 C ≈ 下游单实例 QPS × 扇出因子 × 可接受额外负载比例 × 突发时间窗口(秒)

举例:下游 QPS = 1000,扇出 = 5,允许额外 5% 负载,突发窗口 1 秒:

C ≈ 1000 × 5 × 0.05 × 1 = 250

填充速率 r = C / 窗口秒数,保证持续负载不超过 5%。

需要注意的边界条件

对冲不是重试。 对冲请求发给同一个下游实例(或同质实例),不是换一个不同的服务。如果下游整体过载,对冲只会加剧问题——令牌桶就是为此存在的。

取消要干净。 对冲请求返回后,必须取消另一个请求,而不是让它继续消耗下游资源。上面的代码用 task.cancel() + await 确保取消传播到下游。

DDSketch 合并适合分布式部署。 如果你的客户端是多实例部署,每个实例维护自己的 sketch,定期合并到中心节点计算全局阈值,再下发。DDSketch 的 merge 操作是 O(1) 级别,不会成为瓶颈。

冷启动问题。 新部署时 sketch 没有历史数据,前 20 个请求不会发对冲。可以用上一个版本的 sketch 快照做初始化,或者用静态阈值做过渡。

上线检查清单

  1. 确认拓扑是扇出型。 串行调用链的对冲逻辑不同,不要套用同一套参数。
  2. 先在影子环境跑。 对冲请求写影子标记,下游识别后正常处理但不计入业务逻辑。
  3. 监控三个指标: p99 延迟变化、对冲请求占比(hedged / total)、下游总 QPS 增幅。
  4. 令牌桶参数从保守开始。 先设 refill_rate = 下游 QPS × 2%,观察对冲占比,再逐步放开。
  5. 窗口周期对齐部署周期。 如果你每 5 分钟发一次部署,窗口设 30 秒到 1 分钟足够跟上漂移。
  6. 加兜底硬超时。 即使对冲机制正常,也要设一个绝对超时(比如 p99.9 × 3),防止极端情况无限等待。

扇出架构的 p99 问题本质上是统计叠加,不是单点故障。自适应对冲请求用统计方法找到"该发对冲的时刻",用限流手段控制"对冲的代价",两者配合才能在不显著放大负载的前提下,把尾部延迟削平。


相关推荐