Uber Eats 如何用实时序列特征与列表级排序重塑首页推荐

2026-05-22 31 预计阅读时间:1 分钟
来源:infoq.com AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:15 分钟

外卖推荐和短视频推荐有一个本质区别:用户打开抖音可以刷半小时,但打开 Uber Eats 通常只想在三分钟内找到今晚吃什么。这意味着推荐系统必须在极短的交互窗口里做出精准判断,而判断的依据——用户刚刚浏览了哪些店铺、停留了多久、是否跳过了某类菜品——这些信号如果还停留在昨天的数据里,推荐就永远慢半拍。

Uber 最近对 Uber Eats Home Feed 推荐系统做了一次大幅重构:把特征新鲜度从 24 小时压缩到秒级,把手工特征换成 Transformer 序列建模,把逐条打分(pointwise)换成列表级生成式推荐(listwise GenRec)。这三步叠加的效果,是首页推荐从"基于你上周的历史"变成了"基于你刚才的点击"。

特征新鲜度:从天级到秒级

旧系统中,用户行为特征以批处理方式离线计算,T+1 甚至 T+24 才能进入模型。一个用户中午点了轻食,晚上打开 App 时首页还在推荐他上周常吃的汉堡——因为中午的行为还没进特征库。

重构后的方案引入了近实时(near real-time)用户序列特征管道:

  • 流式采集:用户每次点击、停留、加购等事件通过 Kafka 进入实时特征计算层。
  • 增量更新:序列特征(如最近浏览的 N 个店铺 ID、品类序列)在秒级窗口内完成拼接与聚合,写入在线特征存储(如 Redis 或 Feature Store)。
  • 模型侧消费:推理请求拉取特征时拿到的是几秒前的状态,而非昨天的快照。

这不仅是工程上的提速,更改变了模型能"看到"什么。序列建模需要的是时间顺序上的行为流,而不是一个扁平的统计摘要。

从手工特征到 Transformer 序列建模

旧模型依赖大量手工特征:用户过去 7 天订单数、最常点品类、平均客单价等。这些特征是静态的、聚合过的,丢失了行为的时序结构。

新方案用 Transformer 对用户实时行为序列做建模:

  • 输入是用户最近若干次交互事件(浏览、点击、加购、下单),每个事件带时间戳、店铺 ID、品类标签等。
  • Transformer 的 self-attention 自然捕捉序列中的依赖关系——比如"浏览了三家日料店但都没下单"这个模式,手工特征很难表达。
  • 序列长度可动态调整,短序列用户(新用户或低活跃用户)和长序列用户共享同一个模型架构,不需要分治。

关键收益在于:模型不再需要人工定义"用户偏好品类"这类中间概念,它直接从原始行为序列中学习偏好模式。

从逐条打分到列表级生成式推荐

传统推荐对候选列表中的每个 item 独立打分(pointwise),然后按分数排序。这忽略了一个事实:首页推荐是一个整体展示,item 之间存在上下文关系。用户刚看到一家高分日料店,紧接着再推一家类似的日料店,体验并不好——多样性不足,也没有利用"看完日料后用户可能想看甜品"这种跨品类转移模式。

Uber 引入的 listwise GenRec 把整个候选列表作为输入,一次性生成排序结果:

  • 上下文感知:模型在决定 item A 的位置时,能看到 item B、C、D 的信息,从而避免重复、增强多样性、利用品类转移。
  • 生成式范式:借鉴大语言模型的思路,把排序问题看作"生成一个最优列表"的序列决策问题,而非独立打分后拼接。
  • 实时个性化:因为输入特征是秒级更新的,列表级排序每次请求都能反映用户最新状态。

实践:搭建一个最小实时序列推荐原型

下面的示例展示如何用 Python 构建一个简化版的实时序列特征管道 + Transformer 序列编码 + 列表级排序思路。它不是 Uber 的真实实现,但涵盖了核心概念:流式序列构建、Transformer 编码、列表级重排。

1. 实时序列特征构建

from collections import deque
from datetime import datetime
import json

class UserSequenceStore:
    """模拟实时用户行为序列存储,秒级更新"""

    def __init__(self, max_len=50):
        # 用户ID -> 行为序列
        self.sequences: dict[str, deque] = {}
        self.max_len = max_len

    def append_event(self, user_id: str, event: dict):
        """接收一个实时事件并追加到序列尾部"""
        if user_id not in self.sequences:
            self.sequences[user_id] = deque(maxlen=self.max_len)
        # 事件结构:动作类型、店铺ID、品类、时间戳
        event["timestamp"] = datetime.utcnow().isoformat()
        self.sequences[user_id].append(event)

    def get_sequence(self, user_id: str) -> list[dict]:
        """推理时拉取最新序列"""
        if user_id not in self.sequences:
            return []
        return list(self.sequences[user_id])


# ---- 模拟实时事件流入 ----
store = UserSequenceStore(max_len=20)

# 用户 u123 刚刚浏览了三家日料店,又看了一家甜品店
for shop_id, category, action in [
    ("s001", "japanese", "view"),
    ("s002", "japanese", "view"),
    ("s003", "japanese", "view_dwell"),  # 停留较久
    ("s004", "dessert",  "view"),
]:
    store.append_event("u123", {
        "action": action,
        "shop_id": shop_id,
        "category": category,
    })

print(json.dumps(store.get_sequence("u123"), indent=2, ensure_ascii=False))

运行后你会看到带时间戳的行为序列,这正是 Transformer 需要的输入格式。

2. Transformer 序列编码器

import torch
import torch.nn as nn

class SequenceEncoder(nn.Module):
    """轻量 Transformer 编码器,将行为序列编码为用户偏好向量"""

    def __init__(self, num_categories=30, num_actions=5,
                 d_model=64, nhead=4, num_layers=2):
        super().__init__()
        self.cat_embed = nn.Embedding(num_categories, d_model)
        self.act_embed = nn.Embedding(num_actions, d_model)
        self.pos_embed = nn.Embedding(50, d_model)  # 最大序列长度50

        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model, nhead, batch_first=True),
            num_layers=num_layers,
        )

    def forward(self, cat_ids: torch.LongTensor,
                act_ids: torch.LongTensor) -> torch.Tensor:
        """
        cat_ids: (batch, seq_len) 品类ID序列
        act_ids: (batch, seq_len) 动作ID序列
        返回: (batch, d_model) 用户偏好向量
        """
        seq_len = cat_ids.size(1)
        positions = torch.arange(seq_len, device=cat_ids.device).unsqueeze(0)
        x = self.cat_embed(cat_ids) + self.act_embed(act_ids) + self.pos_embed(positions)
        x = self.transformer(x)  # (batch, seq_len, d_model)
        # 取最后一个位置的输出作为用户表示
        return x[:, -1, :]  # (batch, d_model)


# ---- 使用示例 ----
encoder = SequenceEncoder()
# 品类映射: japanese=1, dessert=2, ...; 动作映射: view=0, view_dwell=1, ...
cat_seq = torch.tensor([[1, 1, 1, 2]])   # 用户 u123 的品类序列
act_seq = torch.tensor([[0, 0, 1, 0]])   # 对应动作序列
user_vec = encoder(cat_seq, act_seq)
print(f"用户偏好向量维度: {user_vec.shape}")  # (1, 64)

3. 列表级重排(简化版 GenRec 思路)

import numpy as np

def listwise_rerank(candidate_items: list[dict],
                    user_vec: np.ndarray,
                    diversity_weight: float = 0.3) -> list[dict]:
    """
    简化版列表级重排:
    1. 计算每个候选与用户向量的相关性分数
    2. 在排序时引入多样性惩罚(同品类连续出现会被压低)
    3. 返回重排后的列表

    真正的 GenRec 会用 Transformer 对整个列表做端到端生成,
    这里用启发式方法模拟列表级上下文感知的效果。
    """
    # 假设每个候选 item 有一个预计算的 embedding
    # 相关性 = 用户向量与 item 向量的余弦相似度
    scores = []
    for item in candidate_items:
        item_vec = np.array(item["embedding"])
        relevance = np.dot(user_vec, item_vec) / (
            np.linalg.norm(user_vec) * np.linalg.norm(item_vec) + 1e-8
        )
        scores.append({"item": item, "relevance": float(relevance)})

    # 按相关性降序做初始排序
    scores.sort(key=lambda x: x["relevance"], reverse=True)

    # 列表级重排:贪心选择,加入多样性惩罚
    ranked = []
    used_categories = []
    remaining = scores.copy()

    while remaining:
        best_idx = 0
        best_score = -np.inf
        for i, entry in enumerate(remaining):
            cat = entry["item"]["category"]
            # 同品类连续出现次数越多,惩罚越大
            cat_repeat = used_categories.count(cat)
            diversity_penalty = diversity_weight * cat_repeat
            adjusted = entry["relevance"] - diversity_penalty
            if adjusted > best_score:
                best_score = adjusted
                best_idx = i

        chosen = remaining.pop(best_idx)
        ranked.append(chosen["item"])
        used_categories.append(chosen["item"]["category"])

    return ranked


# ---- 模拟候选集与重排 ----
np.random.seed(42)
user_vec_np = user_vec.detach().numpy().squeeze()

candidates = [
    {"shop_id": "s010", "category": "japanese", "embedding": np.random.randn(64)},
    {"shop_id": "s011", "category": "japanese", "embedding": np.random.randn(64)},
    {"shop_id": "s012", "category": "dessert",  "embedding": np.random.randn(64)},
    {"shop_id": "s013", "category": "chinese",   "embedding": np.random.randn(64)},
    {"shop_id": "s014", "category": "japanese", "embedding": np.random.randn(64)},
]

result = listwise_rerank(candidates, user_vec_np, diversity_weight=0.3)
for r in result:
    print(f"店铺 {r['shop_id']} | 品类 {r['category']}")

运行后你会看到品类不再连续堆叠——日料店之间被甜品和中餐穿插,这正是列表级排序的上下文感知效果。真正的 GenRec 用端到端模型替代这个启发式步骤,但核心思路一致:排序时看全局,而非逐条独立打分。

落地时的取舍与检查清单

把这套架构从论文搬到生产环境,有几件事值得提前想清楚:

实时特征管道的成本 秒级更新意味着 Kafka + 流式计算 + 在线特征存储的全链路都要低延迟。如果你的业务峰值 QPS 不高,可以先做分钟级而非秒级,成本差异很大。Uber 之所以能做到秒级,和它已有的实时基础设施直接相关。

序列长度与冷启动 Transformer 对长序列效果好,但新用户序列为空或极短。需要设计 fallback:短序列用户可以回退到基于人口统计或热门榜单的简单策略,或者用预训练的通用序列模式做初始化。

列表级排序的推理开销 Pointwise 打分可以对每个候选独立并行计算;listwise 模型一次处理整个候选集,推理复杂度更高。实际部署中通常先用 pointwise 做粗排缩小候选集(比如从 1000 缩到 50),再对 50 个候选做 listwise 精排——Uber 的流程也是这个两阶段结构。

多样性 vs 相关性的平衡 列表级排序天然有利于多样性,但业务上有时需要强相关性优先(比如促销活动期间)。diversity_weight 这类参数需要可配置、可 A/B 测试,而不是硬编码。

检查清单:

  • [ ] 实时特征管道的延迟是否满足业务 SLA(秒级 / 分钟级 / 小时级)?
  • [ ] 序列特征存储的内存与过期策略是否设计好?
  • [ ] 冷启动用户是否有 fallback 推荐路径?
  • [ ] 粗排 → 精排的两阶段流程是否就绪?
  • [ ] 多样性参数是否可动态调整并支持 A/B 实验?
  • [ ] Transformer 序列编码器的推理延迟是否在 p99 预算内?

Uber 这次重构的核心洞察并不复杂:推荐系统的上限往往不是模型架构,而是模型能看到什么数据、在什么时间看到。把特征新鲜度从天级压到秒级,再配上能消化时序信息的 Transformer 和能感知上下文的列表级排序,推荐就从"猜你大概喜欢什么"变成了"知道你现在想要什么"。这个思路不仅适用于外卖,任何短交互窗口的推荐场景——电商首页、酒店搜索、即时配送——都可以复用。


相关推荐