外卖推荐和短视频推荐有一个本质区别:用户打开抖音可以刷半小时,但打开 Uber Eats 通常只想在三分钟内找到今晚吃什么。这意味着推荐系统必须在极短的交互窗口里做出精准判断,而判断的依据——用户刚刚浏览了哪些店铺、停留了多久、是否跳过了某类菜品——这些信号如果还停留在昨天的数据里,推荐就永远慢半拍。
Uber 最近对 Uber Eats Home Feed 推荐系统做了一次大幅重构:把特征新鲜度从 24 小时压缩到秒级,把手工特征换成 Transformer 序列建模,把逐条打分(pointwise)换成列表级生成式推荐(listwise GenRec)。这三步叠加的效果,是首页推荐从"基于你上周的历史"变成了"基于你刚才的点击"。
特征新鲜度:从天级到秒级
旧系统中,用户行为特征以批处理方式离线计算,T+1 甚至 T+24 才能进入模型。一个用户中午点了轻食,晚上打开 App 时首页还在推荐他上周常吃的汉堡——因为中午的行为还没进特征库。
重构后的方案引入了近实时(near real-time)用户序列特征管道:
- 流式采集:用户每次点击、停留、加购等事件通过 Kafka 进入实时特征计算层。
- 增量更新:序列特征(如最近浏览的 N 个店铺 ID、品类序列)在秒级窗口内完成拼接与聚合,写入在线特征存储(如 Redis 或 Feature Store)。
- 模型侧消费:推理请求拉取特征时拿到的是几秒前的状态,而非昨天的快照。
这不仅是工程上的提速,更改变了模型能"看到"什么。序列建模需要的是时间顺序上的行为流,而不是一个扁平的统计摘要。
从手工特征到 Transformer 序列建模
旧模型依赖大量手工特征:用户过去 7 天订单数、最常点品类、平均客单价等。这些特征是静态的、聚合过的,丢失了行为的时序结构。
新方案用 Transformer 对用户实时行为序列做建模:
- 输入是用户最近若干次交互事件(浏览、点击、加购、下单),每个事件带时间戳、店铺 ID、品类标签等。
- Transformer 的 self-attention 自然捕捉序列中的依赖关系——比如"浏览了三家日料店但都没下单"这个模式,手工特征很难表达。
- 序列长度可动态调整,短序列用户(新用户或低活跃用户)和长序列用户共享同一个模型架构,不需要分治。
关键收益在于:模型不再需要人工定义"用户偏好品类"这类中间概念,它直接从原始行为序列中学习偏好模式。
从逐条打分到列表级生成式推荐
传统推荐对候选列表中的每个 item 独立打分(pointwise),然后按分数排序。这忽略了一个事实:首页推荐是一个整体展示,item 之间存在上下文关系。用户刚看到一家高分日料店,紧接着再推一家类似的日料店,体验并不好——多样性不足,也没有利用"看完日料后用户可能想看甜品"这种跨品类转移模式。
Uber 引入的 listwise GenRec 把整个候选列表作为输入,一次性生成排序结果:
- 上下文感知:模型在决定 item A 的位置时,能看到 item B、C、D 的信息,从而避免重复、增强多样性、利用品类转移。
- 生成式范式:借鉴大语言模型的思路,把排序问题看作"生成一个最优列表"的序列决策问题,而非独立打分后拼接。
- 实时个性化:因为输入特征是秒级更新的,列表级排序每次请求都能反映用户最新状态。
实践:搭建一个最小实时序列推荐原型
下面的示例展示如何用 Python 构建一个简化版的实时序列特征管道 + Transformer 序列编码 + 列表级排序思路。它不是 Uber 的真实实现,但涵盖了核心概念:流式序列构建、Transformer 编码、列表级重排。
1. 实时序列特征构建
from collections import deque
from datetime import datetime
import json
class UserSequenceStore:
"""模拟实时用户行为序列存储,秒级更新"""
def __init__(self, max_len=50):
# 用户ID -> 行为序列
self.sequences: dict[str, deque] = {}
self.max_len = max_len
def append_event(self, user_id: str, event: dict):
"""接收一个实时事件并追加到序列尾部"""
if user_id not in self.sequences:
self.sequences[user_id] = deque(maxlen=self.max_len)
# 事件结构:动作类型、店铺ID、品类、时间戳
event["timestamp"] = datetime.utcnow().isoformat()
self.sequences[user_id].append(event)
def get_sequence(self, user_id: str) -> list[dict]:
"""推理时拉取最新序列"""
if user_id not in self.sequences:
return []
return list(self.sequences[user_id])
# ---- 模拟实时事件流入 ----
store = UserSequenceStore(max_len=20)
# 用户 u123 刚刚浏览了三家日料店,又看了一家甜品店
for shop_id, category, action in [
("s001", "japanese", "view"),
("s002", "japanese", "view"),
("s003", "japanese", "view_dwell"), # 停留较久
("s004", "dessert", "view"),
]:
store.append_event("u123", {
"action": action,
"shop_id": shop_id,
"category": category,
})
print(json.dumps(store.get_sequence("u123"), indent=2, ensure_ascii=False))
运行后你会看到带时间戳的行为序列,这正是 Transformer 需要的输入格式。
2. Transformer 序列编码器
import torch
import torch.nn as nn
class SequenceEncoder(nn.Module):
"""轻量 Transformer 编码器,将行为序列编码为用户偏好向量"""
def __init__(self, num_categories=30, num_actions=5,
d_model=64, nhead=4, num_layers=2):
super().__init__()
self.cat_embed = nn.Embedding(num_categories, d_model)
self.act_embed = nn.Embedding(num_actions, d_model)
self.pos_embed = nn.Embedding(50, d_model) # 最大序列长度50
self.transformer = nn.TransformerEncoder(
nn.TransformerEncoderLayer(d_model, nhead, batch_first=True),
num_layers=num_layers,
)
def forward(self, cat_ids: torch.LongTensor,
act_ids: torch.LongTensor) -> torch.Tensor:
"""
cat_ids: (batch, seq_len) 品类ID序列
act_ids: (batch, seq_len) 动作ID序列
返回: (batch, d_model) 用户偏好向量
"""
seq_len = cat_ids.size(1)
positions = torch.arange(seq_len, device=cat_ids.device).unsqueeze(0)
x = self.cat_embed(cat_ids) + self.act_embed(act_ids) + self.pos_embed(positions)
x = self.transformer(x) # (batch, seq_len, d_model)
# 取最后一个位置的输出作为用户表示
return x[:, -1, :] # (batch, d_model)
# ---- 使用示例 ----
encoder = SequenceEncoder()
# 品类映射: japanese=1, dessert=2, ...; 动作映射: view=0, view_dwell=1, ...
cat_seq = torch.tensor([[1, 1, 1, 2]]) # 用户 u123 的品类序列
act_seq = torch.tensor([[0, 0, 1, 0]]) # 对应动作序列
user_vec = encoder(cat_seq, act_seq)
print(f"用户偏好向量维度: {user_vec.shape}") # (1, 64)
3. 列表级重排(简化版 GenRec 思路)
import numpy as np
def listwise_rerank(candidate_items: list[dict],
user_vec: np.ndarray,
diversity_weight: float = 0.3) -> list[dict]:
"""
简化版列表级重排:
1. 计算每个候选与用户向量的相关性分数
2. 在排序时引入多样性惩罚(同品类连续出现会被压低)
3. 返回重排后的列表
真正的 GenRec 会用 Transformer 对整个列表做端到端生成,
这里用启发式方法模拟列表级上下文感知的效果。
"""
# 假设每个候选 item 有一个预计算的 embedding
# 相关性 = 用户向量与 item 向量的余弦相似度
scores = []
for item in candidate_items:
item_vec = np.array(item["embedding"])
relevance = np.dot(user_vec, item_vec) / (
np.linalg.norm(user_vec) * np.linalg.norm(item_vec) + 1e-8
)
scores.append({"item": item, "relevance": float(relevance)})
# 按相关性降序做初始排序
scores.sort(key=lambda x: x["relevance"], reverse=True)
# 列表级重排:贪心选择,加入多样性惩罚
ranked = []
used_categories = []
remaining = scores.copy()
while remaining:
best_idx = 0
best_score = -np.inf
for i, entry in enumerate(remaining):
cat = entry["item"]["category"]
# 同品类连续出现次数越多,惩罚越大
cat_repeat = used_categories.count(cat)
diversity_penalty = diversity_weight * cat_repeat
adjusted = entry["relevance"] - diversity_penalty
if adjusted > best_score:
best_score = adjusted
best_idx = i
chosen = remaining.pop(best_idx)
ranked.append(chosen["item"])
used_categories.append(chosen["item"]["category"])
return ranked
# ---- 模拟候选集与重排 ----
np.random.seed(42)
user_vec_np = user_vec.detach().numpy().squeeze()
candidates = [
{"shop_id": "s010", "category": "japanese", "embedding": np.random.randn(64)},
{"shop_id": "s011", "category": "japanese", "embedding": np.random.randn(64)},
{"shop_id": "s012", "category": "dessert", "embedding": np.random.randn(64)},
{"shop_id": "s013", "category": "chinese", "embedding": np.random.randn(64)},
{"shop_id": "s014", "category": "japanese", "embedding": np.random.randn(64)},
]
result = listwise_rerank(candidates, user_vec_np, diversity_weight=0.3)
for r in result:
print(f"店铺 {r['shop_id']} | 品类 {r['category']}")
运行后你会看到品类不再连续堆叠——日料店之间被甜品和中餐穿插,这正是列表级排序的上下文感知效果。真正的 GenRec 用端到端模型替代这个启发式步骤,但核心思路一致:排序时看全局,而非逐条独立打分。
落地时的取舍与检查清单
把这套架构从论文搬到生产环境,有几件事值得提前想清楚:
实时特征管道的成本 秒级更新意味着 Kafka + 流式计算 + 在线特征存储的全链路都要低延迟。如果你的业务峰值 QPS 不高,可以先做分钟级而非秒级,成本差异很大。Uber 之所以能做到秒级,和它已有的实时基础设施直接相关。
序列长度与冷启动 Transformer 对长序列效果好,但新用户序列为空或极短。需要设计 fallback:短序列用户可以回退到基于人口统计或热门榜单的简单策略,或者用预训练的通用序列模式做初始化。
列表级排序的推理开销 Pointwise 打分可以对每个候选独立并行计算;listwise 模型一次处理整个候选集,推理复杂度更高。实际部署中通常先用 pointwise 做粗排缩小候选集(比如从 1000 缩到 50),再对 50 个候选做 listwise 精排——Uber 的流程也是这个两阶段结构。
多样性 vs 相关性的平衡 列表级排序天然有利于多样性,但业务上有时需要强相关性优先(比如促销活动期间)。diversity_weight 这类参数需要可配置、可 A/B 测试,而不是硬编码。
检查清单:
- [ ] 实时特征管道的延迟是否满足业务 SLA(秒级 / 分钟级 / 小时级)?
- [ ] 序列特征存储的内存与过期策略是否设计好?
- [ ] 冷启动用户是否有 fallback 推荐路径?
- [ ] 粗排 → 精排的两阶段流程是否就绪?
- [ ] 多样性参数是否可动态调整并支持 A/B 实验?
- [ ] Transformer 序列编码器的推理延迟是否在 p99 预算内?
Uber 这次重构的核心洞察并不复杂:推荐系统的上限往往不是模型架构,而是模型能看到什么数据、在什么时间看到。把特征新鲜度从天级压到秒级,再配上能消化时序信息的 Transformer 和能感知上下文的列表级排序,推荐就从"猜你大概喜欢什么"变成了"知道你现在想要什么"。这个思路不仅适用于外卖,任何短交互窗口的推荐场景——电商首页、酒店搜索、即时配送——都可以复用。