RocketMQ 5.5.0 开源 LiteTopic:给百万级 AI 会话开一条专属通道

2026-05-27 17 预计阅读时间:1 分钟
来源:my.oschina.net AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:13 分钟

AI Agent 爆发之后,消息中间件遇到了一个新问题:每个用户会话都是一条独立的消息流,百万用户同时在线就意味着百万条轻量通道同时运转。传统 Topic 模型为每条通道分配完整的元数据与存储结构,通道数一多,Broker 内存和元数据同步就成了瓶颈。RocketMQ 5.5.0 把社区提案 RIP-83 定义的 LiteTopic 正式推入开源版本,专门应对这类"量大、权重低"的场景。

传统 Topic 模型为什么撑不住百万会话

RocketMQ 原有的 Topic 机制是为"少量 Topic、大量 Queue"设计的——每个 Topic 在 Broker 上注册完整元数据,NameServer 同步路由信息,消费进度独立持久化。这套机制在电商、金融等几十到几百个 Topic 的场景下非常稳,但搬到 AI 会话场景就暴露了两个硬伤:

  • 元数据膨胀:每个会话通道如果映射为一个 Topic,百万会话就是百万条 Topic 元数据,NameServer 心跳包和 Broker 内存占用直线上升。
  • 消费进度碎片化:每个 Topic 独立维护消费 offset,百万通道意味着百万份 checkpoint 文件,磁盘 IO 和恢复时间都不可控。

LiteTopic 的核心思路是:把"通道"从 Topic 级别降维到 Queue 内部的逻辑分区,共享 Topic 的元数据壳子,只在消息体和索引层标注通道标识。百万通道共享一个 Topic 的路由和存储框架,元数据开销从 O(N) 降到 O(1)。

LiteTopic 的三处针对性设计

轻量通道管理

LiteTopic 引入 Channel ID 概念。同一个 LiteTopic 下,每条消息携带一个短字符串作为通道标识,Broker 在 CommitLog 写入时保留 Channel ID,ConsumeQueue 索引中增加 Channel ID 字段。客户端订阅时可以指定 Channel ID 过滤,只拉取自己会话的消息。

这意味着创建一个新会话通道不需要向 Broker 注册任何元数据——生产者直接往 LiteTopic 发消息、带上 Channel ID 即可,通道"诞生"的代价几乎为零。

消费状态持久化

百万通道的消费进度不可能各自维护文件。LiteTopic 采用 集中式进度存储:同一 LiteTopic 下所有 Channel 的消费 offset 合并存储在一张内部进度表中,按 Channel ID 索引。这张表本身作为一个特殊的内部 Topic 存储,Broker 定期批量刷盘,恢复时一次加载即可。

对于 AI 会话场景,大量通道的生命周期很短(用户对话结束,通道就不再活跃),LiteTopic 还支持对长期无消费进度的 Channel ID 做 自动淘汰,避免进度表无限膨胀。

事件驱动分发

AI Agent 的会话消费模式跟传统批量消费不同:一条消息到达后,Agent 需要立刻响应,而不是等攒够一批再处理。LiteTopic 在客户端侧提供了 事件驱动推送 模式——Broker 检测到某 Channel 有新消息写入后,主动通知订阅该 Channel 的客户端,客户端收到事件再执行拉取,省去了长轮询的空转开销。

这套机制对"低延迟、低吞吐"的会话通道尤为有效:消息稀疏但响应要求高,长轮询大部分时间在空等,事件推送把延迟压到毫秒级。

实际上手:创建 LiteTopic 并发送 AI 会话消息

以下示例基于 RocketMQ 5.5.0,演示从部署到收发消息的完整流程。假设你已经有一套运行中的 RocketMQ 5.x 集群。

1. 用 mqadmin 创建 LiteTopic

# 创建 LiteTopic,名称为 ai-chat-lite
# liteTopicEnable=true 是关键参数,表示启用 LiteTopic 模型
sh mqadmin updateTopic \
  -n localhost:9876 \
  -c DefaultCluster \
  -t ai-chat-lite \
  -a +liteTopicEnable \
  -r 4 \
  -w 4

# 验证 Topic 属性
sh mqadmin topicStatus -n localhost:9876 -t ai-chat-lite

-a +liteTopicEnable 向 Topic 属性中追加 LiteTopic 标记。-r 4 -w 4 分别设置可读和可写 Queue 数量——注意这里的 Queue 数量是物理 Queue,百万 Channel 在逻辑层面共享这几个物理 Queue。

2. 生产者发送带 Channel ID 的消息

import grpc
from rocketmq.client import Producer, Message

producer = Producer(
    name_server="localhost:9876",
    group="ai-chat-producer-group"
)
producer.start()

def send_chat_message(user_session_id: str, role: str, content: str):
    """向 LiteTopic 发送一条 AI 会话消息,Channel ID = 用户会话 ID"""
    msg = Message()
    msg.topic = "ai-chat-lite"
    # Channel ID 直接复用用户会话 ID,实现会话隔离
    msg.channel_id = user_session_id
    msg.body = f'{{"role": "{role}", "content": "{content}"}}'.encode("utf-8")
    msg.tags = "chat"

    result = producer.send_sync(msg)
    print(f"Sent to channel [{user_session_id}], msg_id={result.msg_id}, offset={result.offset}")

# 模拟三个并发用户会话
send_chat_message("session-user-001", "user", "帮我查一下今天的天气")
send_chat_message("session-user-002", "user", "生成一段 Python 快排代码")
send_chat_message("session-user-003", "user", "总结这篇论文的核心观点")

producer.shutdown()

关键点:msg.channel_id 字段是 LiteTopic 的核心入口。同一个 Topic 下,不同 Channel ID 的消息在物理 Queue 上混合存储,但客户端消费时可以按 Channel ID 精确过滤。

3. 消费者订阅指定 Channel

from rocketmq.client import PushConsumer, FilterExpression

consumer = PushConsumer(
    name_server="localhost:9876",
    group="ai-chat-consumer-group",
    # 事件驱动模式:有新消息时 Broker 主动推送通知
    delivery_mode="event_driven"
)

# 只订阅 session-user-001 这个通道的消息
consumer.subscribe(
    "ai-chat-lite",
    FilterExpression(
        channel_id="session-user-001",
        type="channel"
    ),
    lambda msg: print(f"Received: channel={msg.channel_id}, body={msg.body.decode()}")
)

consumer.start()

# 保持消费运行...实际项目中可结合 async 框架
import time
time.sleep(60)
consumer.shutdown()

FilterExpressiontype="channel" 表示按 Channel ID 过滤,而非传统的 Tag 过滤。delivery_mode="event_driven" 启用事件驱动推送,适合 AI Agent 的即时响应场景。

注意:上述 Python SDK 接口基于 RocketMQ 5.x gRPC 协议的预期封装形式。5.5.0 刚发布,开源 Python SDK 对 LiteTopic 的完整支持可能仍在迭代,实际使用时请对照官方 SDK 最新文档确认 channel_idFilterExpression 的具体参数名。如果 SDK 尚未封装 Channel 过滤,可以退回 gRPC 原生调用,在 ReceiveMessageRequest 中设置 filterExpression 字段。

4. 用 YAML 定义一组 AI Agent 服务的消费配置

在 Kubernetes 环境下部署多个 Agent 服务实例时,可以用 ConfigMap 统一管理 LiteTopic 消费参数:

apiVersion: v1
kind: ConfigMap
metadata:
  name: ai-agent-rocketmq-config
data:
  NAMESERVER_ADDR: "rocketmq-nameserver:9876"
  CONSUMER_GROUP: "ai-agent-consumer-group"
  LITE_TOPIC: "ai-chat-lite"
  DELIVERY_MODE: "event_driven"
  # 单个实例最大订阅通道数,防止一个 Pod 负载过重
  MAX_CHANNELS_PER_INSTANCE: "1000"
  # 通道自动淘汰阈值:连续 30 分钟无消费即视为不活跃
  CHANNEL_EVICTION_TIMEOUT_MIN: "30"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ai-agent-worker
spec:
  replicas: 10
  selector:
    matchLabels:
      app: ai-agent-worker
  template:
    metadata:
      labels:
        app: ai-agent-worker
    spec:
      containers:
        - name: agent
          image: your-org/ai-agent-worker:latest
          envFrom:
            - configMapRef:
                name: ai-agent-rocketmq-config
          resources:
            requests:
              cpu: "500m"
              memory: "512Mi"
            limits:
              cpu: "2"
              memory: "2Gi"

每个 Pod 启动时从 ConfigMap 读取 LiteTopic 配置,根据负载均衡策略分配一批 Channel ID 进行订阅。10 个 Pod 各承担约 1000 个活跃通道,合计可服务万级并发会话;横向扩容 Pod 数量即可线性提升通道容量。

LiteTopic 的适用边界与选型建议

LiteTopic 不是万能替代,它针对的是 "通道数极多、单通道吞吐低" 的场景。选型时可以对照以下判断:

场景特征 推荐 Topic 模型
通道数 < 1000,单通道吞吐 > 1000 msg/s 传统 Topic
通道数 > 10 万,单通道吞吐 < 10 msg/s,低延迟要求 LiteTopic
通道数中等,但消息体大、需严格顺序 传统 Topic + OrderlyMessage
AI Agent 会话、异步任务回调、IoT 设备心跳 LiteTopic

几个需要注意的边界:

  • 单通道吞吐上限:LiteTopic 的物理 Queue 数量有限,所有 Channel 共享这些 Queue 的写入带宽。如果少数 Channel 突然产生高频消息,可能挤占其他 Channel 的延迟。建议对单 Channel 做速率限制。
  • 消息过滤开销:Channel ID 过滤在 Broker 端执行,百万 Channel 同时有活跃订阅时,Broker 的过滤计算量不可忽视。实际部署中建议客户端侧做 Channel 分片,每个实例只订阅自己负责的 Channel 子集。
  • 事务消息兼容性:5.5.0 开源版本中 LiteTopic 与半事务消息的联合使用仍在完善,如果业务依赖事务消息保证一致性,建议继续使用传统 Topic。

从云上到开源:迁移路径

LiteTopic 此前已在阿里云云消息队列 RocketMQ 版上商业化运行,支撑了大规模 AI 会话和异步任务场景。5.5.0 是该模型首次进入开源主线,意味着自建集群的用户也能直接使用。

从云上迁移到开源自建的建议路径:

  1. 评估通道规模:统计业务中同时活跃的会话/任务通道数,超过 1 万就值得考虑 LiteTopic。
  2. 灰度替换:先选一个低风险业务(如异步任务回调)用 LiteTopic 试点,确认延迟和稳定性符合预期后再扩展到 AI 会话主链路。
  3. 监控指标:重点关注 Broker 的 liteTopicChannelCount(活跃通道数)、liteTopicFilterLatency(Channel 过滤延迟)和 consumeProgressTableSize(进度表大小)三个新增指标。
  4. 淘汰策略调优:根据业务会话的平均生命周期,调整 CHANNEL_EVICTION_TIMEOUT_MIN,避免进度表残留大量僵尸通道。

RocketMQ 5.5.0 的 LiteTopic 把"百万级轻量通道"从云上专属能力变成了开源基础设施。如果你的系统正在为 AI Agent 会话或海量异步回调的通道管理头疼,现在可以直接在开源集群上验证这条专属通道。


相关推荐