你监控每个服务的 p99 都在 200ms 以内,但网关层的 p99 却飙到 800ms——这不是监控出了问题,而是扇出架构里"慢但没挂"的请求在层层叠加。Prathamesh Bhope 在这篇文章里提出了一套自适应对冲请求(adaptive hedged request)方案:用 DDSketch 实时估算分位数决定何时发对冲请求,用滑动窗口应对分布漂移,用令牌桶限制对冲请求总量,最终把 p99 延迟压低了 74%。
扇出架构的延迟叠加效应
扇出(fan-out)是微服务里最常见的拓扑:一个请求到达网关后,并行调用 N 个下游服务,等最慢的那个返回才算完成。假设每个服务 99% 的请求在 200ms 内完成,但各有 1% 的请求要 600ms。如果扇出因子是 5:
- 单服务 p99 = 600ms
- 网关层 p99 ≈ 1 − (0.99)^5 ≈ 4.9% 的请求会撞上至少一个慢下游,p99 跳到 600ms 甚至更高
这就是"straggler"问题——请求没有失败,只是慢了。传统超时策略要么太激进(误杀正常慢请求),要么太保守(p99 没改善)。重试也不合适:慢请求最终会完成,重试只是多了一份负载。
Hedged Request:发一个对冲请求,取最快的
Google 2015 年的论文《The Tail at Scale》提出了 hedged request:先发一个正常请求,如果在"合理时间"内没收到响应,再发一个对冲请求,两个都等,取最先返回的,取消另一个。
核心问题变成:"合理时间"怎么定?
- 固定阈值(比如 200ms)在负载变化时会失效:低负载时 200ms 太晚,高负载时 200ms 太早,大量对冲请求反而加剧延迟
- 需要一个能跟随实时分布动态调整的阈值
三组件自适应对冲机制
Bhope 的方案由三个组件协同工作:
1. DDSketch 实时分位数估算
DDSketch 是 Datadog 提出的分位数草图算法,核心特点:
- 可合并(mergeable):多个实例的 sketch 可以合并,适合分布式场景
- 相对误差保证:设定 α 后,任何分位数的相对误差不超过 α
- 内存可控:用 collapsing 机制压缩低频桶
用它来实时追踪下游服务的 p95 或 p99,作为对冲阈值的基准。比如设定阈值 = p95 × 1.5,意味着"如果一个请求已经比 95% 的请求慢了 50%,就发对冲"。
2. 滑动窗口应对分布漂移
服务延迟分布不是静态的:流量高峰、部署变更、缓存冷启动都会让分布漂移。如果 sketch 无限累积,旧数据会拖累阈值判断。
方案是窗口化旋转:维护两个 sketch——当前窗口和上一个窗口。当前窗口接收新数据,上一个窗口提供历史参考。窗口到期时,当前窗口变为上一个窗口,新的当前窗口从零开始。阈值计算基于两个窗口的加权合并:
effective_p95 = 0.7 × current_p95 + 0.3 × previous_p95
这保证了阈值既跟得上短期变化,又不被瞬时抖动带飞。
3. 令牌桶限制对冲放大
对冲请求本质上是多发一份流量。如果下游已经过载,大量对冲请求会让情况更糟。令牌桶(token bucket)是经典的限流手段:
- 桶容量 C:允许短时间内最多 C 个对冲请求
- 填充速率 r:每秒恢复 r 个令牌
只有拿到令牌的对冲请求才会发出,拿不到就等原始请求完成。这把对冲请求的额外负载限制在可控范围内。
实战:用 Python 实现一个自适应 Hedged Client
下面是一个可运行的简化实现,包含 DDSketch 阈值计算、窗口旋转和令牌桶限流。依赖 ddsketch 库(pip install ddsketch)。
"""
Adaptive Hedged Request Client — 简化演示版
pip install ddsketch
"""
import asyncio
import time
import random
from ddsketch import DDSketch
# ---------- 配置 ----------
HEDGE_ALPHA = 0.01 # DDSketch 相对误差
HEDGE_QUANTILE = 0.95 # 用 p95 作为基准
HEDGE_MULTIPLIER = 1.5 # 阈值 = p95 × 1.5
WINDOW_SECONDS = 30 # 窗口旋转周期
WINDOW_WEIGHT_CURRENT = 0.7 # 当前窗口权重
TOKEN_BUCKET_CAPACITY = 20 # 令牌桶容量
TOKEN_BUCKET_REFILL_RATE = 5 # 每秒恢复令牌数
MAX_CONCURRENT_HEDGES = 3 # 同时最多几个对冲请求
class AdaptiveHedgedClient:
def __init__(self):
# 两个 sketch 窗口
self.current_sketch = DDSketch(HEDGE_ALPHA)
self.previous_sketch = DDSketch(HEDGE_ALPHA)
self.window_start = time.monotonic()
# 令牌桶
self.tokens = TOKEN_BUCKET_CAPACITY
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
def _refill_tokens(self):
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(
TOKEN_BUCKET_CAPACITY,
self.tokens + elapsed * TOKEN_BUCKET_REFILL_RATE,
)
self.last_refill = now
async def _try_acquire_token(self) -> bool:
async with self._lock:
self._refill_tokens()
if self.tokens >= 1:
self.tokens -= 1
return True
return False
def _rotate_window_if_needed(self):
now = time.monotonic()
if now - self.window_start >= WINDOW_SECONDS:
self.previous_sketch = self.current_sketch
self.current_sketch = DDSketch(HEDGE_ALPHA)
self.window_start = now
def _get_hedge_threshold(self) -> float | None:
"""计算对冲阈值,窗口数据不足时返回 None(不发对冲)"""
self._rotate_window_if_needed()
# 当前窗口必须有足够样本
if self.current_sketch.count < 20:
# 样本不足,只用上一个窗口(如果有的话)
if self.previous_sketch.count >= 20:
return self.previous_sketch.get_quantile_value(HEDGE_QUANTILE) * HEDGE_MULTIPLIER
return None
current_p95 = self.current_sketch.get_quantile_value(HEDGE_QUANTILE)
if self.previous_sketch.count >= 20:
previous_p95 = self.previous_sketch.get_quantile_value(HEDGE_QUANTILE)
effective_p95 = (
WINDOW_WEIGHT_CURRENT * current_p95
+ (1 - WINDOW_WEIGHT_CURRENT) * previous_p95
)
else:
effective_p95 = current_p95
return effective_p95 * HEDGE_MULTIPLIER
def _record_latency(self, latency_ms: float):
self._rotate_window_if_needed()
self.current_sketch.add(latency_ms)
async def call_with_hedge(self, downstream_call, label: str = "") -> tuple:
"""
downstream_call: async callable,返回 (result, latency_ms)
返回: (result, was_hedged: bool, original_latency_ms, hedge_latency_ms|None)
"""
threshold = self._get_hedge_threshold()
# 发原始请求
start = time.monotonic()
orig_task = asyncio.create_task(downstream_call())
if threshold is None:
# 没有足够数据,不发对冲
result, lat = await orig_task
self._record_latency(lat)
return (result, False, lat, None)
# 等到阈值时间,看原始请求是否已返回
hedge_task = None
try:
remaining = threshold / 1000.0 - (time.monotonic() - start)
if remaining > 0:
done, _ = await asyncio.wait({orig_task}, timeout=remaining)
if done:
result, lat = orig_task.result()
self._record_latency(lat)
return (result, False, lat, None)
except asyncio.CancelledError:
pass
# 原始请求还没返回 → 尝试发对冲
can_hedge = await self._try_acquire_token()
if can_hedge:
hedge_task = asyncio.create_task(downstream_call())
# 等任意一个完成
done, pending = await asyncio.wait(
{orig_task, hedge_task}, return_when=asyncio.FIRST_COMPLETED
)
# 取最先完成的结果
winner = done.pop()
result, lat = winner.result()
self._record_latency(lat)
# 取另一个的延迟(不等结果,只记录取消情况)
loser = pending.pop()
loser.cancel()
try:
await loser
except (asyncio.CancelledError, Exception):
pass
orig_lat = lat if winner == orig_task else None
hedge_lat = lat if winner == hedge_task else None
return (result, True, orig_lat, hedge_lat)
else:
# 没令牌,只能等原始请求
result, lat = await orig_task
self._record_latency(lat)
return (result, False, lat, None)
# ---------- 模拟下游服务 ----------
async def mock_downstream():
"""模拟下游:90% 在 100ms 内,5% 在 300ms,5% 在 800ms(straggler)"""
r = random.random()
if r < 0.90:
lat = random.uniform(80, 120)
elif r < 0.95:
lat = random.uniform(280, 320)
else:
lat = random.uniform(750, 850)
await asyncio.sleep(lat / 1000.0)
return ("ok", lat)
# ---------- 运行演示 ----------
async def main():
client = AdaptiveHedgedClient()
results = []
# 发 200 个请求,观察 p99 变化
for i in range(200):
result, was_hedged, orig_lat, hedge_lat = await client.call_with_hedge(
mock_downstream, label=f"req-{i}"
)
effective_lat = orig_lat or hedge_lat
results.append(effective_lat)
if i % 50 == 0:
sorted_lats = sorted(results)
p50 = sorted_lats[int(len(sorted_lats) * 0.50)]
p95 = sorted_lats[int(len(sorted_lats) * 0.95)]
p99 = sorted_lats[int(len(sorted_lats) * 0.99)]
print(f"[{i:>3} reqs] p50={p50:.0f}ms p95={p95:.0f}ms p99={p99:.0f}ms hedged={was_hedged}")
sorted_lats = sorted(results)
p50 = sorted_lats[int(len(sorted_lats) * 0.50)]
p95 = sorted_lats[int(len(sorted_lats) * 0.95)]
p99 = sorted_lats[int(len(sorted_lats) * 0.99)]
print(f"\n=== 最终结果 ===")
print(f"p50={p50:.0f}ms p95={p95:.0f}ms p99={p99:.0f}ms")
print(f"令牌桶剩余: {client.tokens:.1f}")
if __name__ == "__main__":
asyncio.run(main())
运行方式:
pip install ddsketch
python adaptive_hedged_client.py
关键参数调整说明:
| 参数 | 作用 | 调参方向 |
|---|---|---|
HEDGE_QUANTILE |
基准分位数 | 0.90 更激进(更多对冲),0.99 更保守 |
HEDGE_MULTIPLIER |
阈值放大系数 | 1.2 更激进,2.0 更保守 |
WINDOW_SECONDS |
窗口周期 | 短周期跟得上漂移,但样本少;长周期更稳定 |
TOKEN_BUCKET_REFILL_RATE |
对冲流量上限 | 设为下游 QPS 的 5-10% 通常安全 |
令牌桶容量怎么定
令牌桶容量决定了短时间窗口内允许的对冲请求突发量。一个实用的估算方法:
容量 C ≈ 下游单实例 QPS × 扇出因子 × 可接受额外负载比例 × 突发时间窗口(秒)
举例:下游 QPS = 1000,扇出 = 5,允许额外 5% 负载,突发窗口 1 秒:
C ≈ 1000 × 5 × 0.05 × 1 = 250
填充速率 r = C / 窗口秒数,保证持续负载不超过 5%。
需要注意的边界条件
对冲不是重试。 对冲请求发给同一个下游实例(或同质实例),不是换一个不同的服务。如果下游整体过载,对冲只会加剧问题——令牌桶就是为此存在的。
取消要干净。 对冲请求返回后,必须取消另一个请求,而不是让它继续消耗下游资源。上面的代码用 task.cancel() + await 确保取消传播到下游。
DDSketch 合并适合分布式部署。 如果你的客户端是多实例部署,每个实例维护自己的 sketch,定期合并到中心节点计算全局阈值,再下发。DDSketch 的 merge 操作是 O(1) 级别,不会成为瓶颈。
冷启动问题。 新部署时 sketch 没有历史数据,前 20 个请求不会发对冲。可以用上一个版本的 sketch 快照做初始化,或者用静态阈值做过渡。
上线检查清单
- 确认拓扑是扇出型。 串行调用链的对冲逻辑不同,不要套用同一套参数。
- 先在影子环境跑。 对冲请求写影子标记,下游识别后正常处理但不计入业务逻辑。
- 监控三个指标: p99 延迟变化、对冲请求占比(hedged / total)、下游总 QPS 增幅。
- 令牌桶参数从保守开始。 先设 refill_rate = 下游 QPS × 2%,观察对冲占比,再逐步放开。
- 窗口周期对齐部署周期。 如果你每 5 分钟发一次部署,窗口设 30 秒到 1 分钟足够跟上漂移。
- 加兜底硬超时。 即使对冲机制正常,也要设一个绝对超时(比如 p99.9 × 3),防止极端情况无限等待。
扇出架构的 p99 问题本质上是统计叠加,不是单点故障。自适应对冲请求用统计方法找到"该发对冲的时刻",用限流手段控制"对冲的代价",两者配合才能在不显著放大负载的前提下,把尾部延迟削平。