用 SageMaker AI 和 vLLM 构建实时语音应用:从"等音频传完"到"边说边转"

2026-05-21 30 预计阅读时间:1 分钟
来源:aws.amazon.com AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:12 分钟

语音代理、实时字幕、呼叫中心分析、无障碍辅助工具——这些场景的共同底座是实时语音转文字(streaming speech-to-text)。用户一边说话,系统一边出结果,整条链路跑在一条持久连接上,延迟必须压到毫秒级。

传统推理模式在这里直接卡壳:请求-响应架构要求音频全部收齐后才开始推理,等一段 10 秒的录音传完再出结果,体验已经崩了。Amazon SageMaker AI 搭配 vLLM,把流式推理搬上生产环境,让"边收边推"成为可部署的现实。

传统推理为什么撑不住实时语音

经典推理流程是这样的:

  1. 客户端把完整音频 POST 到服务端;
  2. 服务端收到全部字节后,送入模型做一次前向传播;
  3. 返回整段转录文本。

问题出在第 1 步——等待。一段 30 秒的语音,哪怕网络带宽充裕,光"攒数据"就要吃掉几十到几百毫秒;加上模型推理本身的时间,用户感知到的延迟往往超过 1 秒。对于语音代理场景,1 秒的停顿意味着对话节奏断裂,用户会以为系统卡死或没听懂。

更深层的问题是连接模型。HTTP 请求-响应是一次性的,每次转录都要重新建连接、重新传上下文。多轮对话里,前面说了什么全靠客户端自己拼回去,服务端没有状态延续。

vLLM 的流式推理:一条连接,持续推结果

vLLM 是高吞吐 LLM 推理引擎,核心优势之一是原生支持 streaming:模型每生成一个 token(或一小段文本)就立刻推给客户端,不需要等整段输出完成。

把这套机制搬到语音场景,逻辑变成:

  • 音频流以 chunk 形式持续推入(比如每 100 ms 一个 chunk);
  • 服务端收到 chunk 后立即送入模型做增量推理;
  • 转录片段通过同一条 SSE/WebSocket 连接持续回推;
  • 客户端拼接片段,呈现"边说边出字"的效果。

关键变化:推理不再等数据收齐,而是跟着数据流滚动推进

在 SageMaker AI 上部署流式语音推理端点

SageMaker AI 提供了托管推理端点,支持 streaming response。结合 vLLM 的 continuous batching 和 PagedAttention,可以在单端点上同时服务多路语音流,GPU 显存利用率远高于传统逐请求调度。

下面是一个完整的部署示例,把 vLLM 打包成 SageMaker 端点并开启流式响应。

1. 打包模型镜像并推到 ECR

# 假设你已经有一个 vLLM 兼容的语音模型(如 Whisper-large-v3 的 vLLM 适配版)
# 以下使用 vLLM 官方镜像作为基础

# 登录 ECR
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin 763104351884.dkr.ecr.us-east-1.amazonaws.com

# 拉取 vLLM 基础镜像
docker pull vllm/vllm-openai:latest

# 如果需要自定义 entrypoint 或挂载语音预处理逻辑,可在此基础上构建
# 这里直接使用官方镜像,模型权重从 S3 加载

2. 用 SageMaker Python SDK 创建流式推理端点

import sagemaker
from sagemaker.model import Model

role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.session.Session()

# 模型权重已上传到 S3
model_s3_path = "s3://my-voice-models/whisper-large-v3-vllm/"

model = Model(
    image_uri="vllm/vllm-openai:latest",  # 替换为你推到 ECR 的镜像 URI
    model_data=model_s3_path,
    role=role,
    sagemaker_session=sagemaker_session,
    env={
        # vLLM 启动参数
        "MODEL_NAME": "/opt/ml/model",  # SageMaker 标准模型挂载路径
        "VLLM_SERVE_FROM_S3": "0",
        # 开启 streaming
        "ENABLE_STREAMING": "1",
        # GPU 并行与 batch 配置
        "TENSOR_PARALLEL_SIZE": "1",
        "MAX_NUM_SEQS": "32",  # 同时服务 32 路语音流
        "MAX_NUM_BATCHED_TOKENS": "4096",
    },
)

# 部署到 ml.g5.12xlarge(4xA10G),根据模型大小调整实例类型
predictor = model.deploy(
    initial_instance_count=1,
    instance_type="ml.g5.12xlarge",
    endpoint_name="voice-streaming-vllm",
)

注意:实际部署中,image_uri 应替换为你推到 AWS ECR 的镜像地址,格式为 <account_id>.dkr.ecr.<region>.amazonaws.com/<repo>:<tag>。vLLM 官方镜像不直接在 SageMaker 上运行,需要适配 SageMaker 的 /opt/ml/model 挂载约定和容器入口脚本。

3. 客户端流式调用示例

import json
import requests

endpoint_url = "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/voice-streaming-vllm/invocations"

# 模拟:将音频分 chunk 流式推送
# 实际场景中,chunk 来自麦克风实时采集
audio_chunks = [
    b"<binary_audio_chunk_1>",  # 前 100ms 音频
    b"<binary_audio_chunk_2>",  # 接下来 100ms
    b"<binary_audio_chunk_3>",
    # ... 持续推送
]

# 使用 SageMaker InvokeEndpointWithResponseStream(流式响应 API)
import boto3

sm_runtime = boto3.client("sagemaker-runtime")

# 发送第一个 chunk,开启流式会话
# 实际生产中需配合 WebSocket 或自定义流式协议
response = sm_runtime.invoke_endpoint(
    EndpointName="voice-streaming-vllm",
    ContentType="application/json",
    Body=json.dumps({
        "model": "whisper-large-v3",
        "audio": "<base64_encoded_chunk>",
        "stream": True,
        "language": "zh",
    }),
)

# 解析流式返回
result = json.loads(response["Body"].read())
print(result["text"])  # 增量转录片段

# ---- 更完整的流式调用(SageMaker Streaming API)----
# SageMaker 2024 年新增 InvokeEndpointWithResponseStream
stream_response = sm_runtime.invoke_endpoint_with_response_stream(
    EndpointName="voice-streaming-vllm",
    ContentType="application/json",
    Body=json.dumps({
        "model": "whisper-large-v3",
        "audio": "<base64_encoded_full_or_chunk>",
        "stream": True,
    }),
)

for event in stream_response["Body"]:
    chunk = json.loads(event["PayloadFragment"])
    print(chunk.get("text", ""), end="", flush=True)
# 输出效果:文字逐片段出现,像打字机一样

4. 音频预处理:从麦克风到推理端点

实际部署中,音频采集→预处理→推入端点是一条完整管线。以下是一个简化版实时采集 + 推理的客户端骨架:

import pyaudio
import base64
import json
import threading
import queue

CHUNK_DURATION_MS = 100  # 每 100ms 采集一个 chunk
SAMPLE_RATE = 16000
CHUNK_SIZE = int(SAMPLE_RATE * CHUNK_DURATION_MS / 1000)

audio_queue = queue.Queue()

# 音频采集线程
def capture_audio():
    pa = pyaudio.PyAudio()
    stream = pa.open(
        format=pyaudio.paInt16,
        channels=1,
        rate=SAMPLE_RATE,
        input=True,
        frames_per_buffer=CHUNK_SIZE,
    )
    while True:
        data = stream.read(CHUNK_SIZE, exception_on_overflow=False)
        audio_queue.put(data)

threading.Thread(target=capture_audio, daemon=True).start()

# 推理推送线程——实际生产中应使用 WebSocket 持久连接
# 此处简化为逐 chunk 调用
import boto3
sm_runtime = boto3.client("sagemaker-runtime")

full_transcript = ""

while True:
    chunk_data = audio_queue.get()
    encoded = base64.b64encode(chunk_data).decode("utf-8")

    resp = sm_runtime.invoke_endpoint_with_response_stream(
        EndpointName="voice-streaming-vllm",
        ContentType="application/json",
        Body=json.dumps({
            "model": "whisper-large-v3",
            "audio_chunk": encoded,
            "stream": True,
            "is_final": False,  # 标记是否为最后一个 chunk
        }),
    )

    for event in resp["Body"]:
        fragment = json.loads(event["PayloadFragment"])
        if "text" in fragment:
            full_transcript += fragment["text"]
            print(f"[实时] {full_transcript}")

实践提示:上面的逐 chunk 调用是简化演示。真实生产中,应建立一条 WebSocket/SSE 持久连接,音频 chunk 在同一条连接上持续推送,转录片段在同一条连接上持续回传,避免反复建连的开销。SageMaker 的 InvokeEndpointWithResponseStream 支持这种模式。

性能调优的几个关键点

把流式语音推理跑起来只是第一步,要压到生产级延迟,还需要注意以下配置:

参数 作用 建议值
MAX_NUM_SEQS 同时在 GPU 上跑的语音流数量 16-64,取决于模型大小和 GPU 显存
MAX_NUM_BATCHED_TOKENS 单次 batch 前向传播的最大 token 数 4096-8192,越大吞吐越高但单流延迟可能上升
CHUNK_DURATION_MS 客户端音频 chunk 时长 50-200ms;太短则推理碎片化,太长则延迟上升
TENSOR_PARALLEL_SIZE 跨 GPU 张量并行度 大模型(≥7B)设为实例 GPU 数量

一个常见误区是把 MAX_NUM_SEQS 设得过大。vLLM 的 continuous batching 会把所有活跃请求拼进同一个 batch,seqs 过多时单次前向传播变慢,每条流的 token 推出间隔反而拉长。对于语音代理这类延迟敏感场景,建议控制在 16-32 路,优先保单流延迟。

上线前的检查清单

把流式语音推理端点推上生产之前,逐项确认:

  • 连接协议:生产环境优先用 WebSocket 或 SSE,不要逐 chunk 发 HTTP 请求;
  • 断线重连:语音流中途断连后,客户端需要能从上一个确认的 chunk 位置续传,服务端需要能丢弃半截状态;
  • 冷启动:SageMaker 端点首次调用有模型加载延迟(大模型可能 30-60s),用 provisioned concurrency 或预热请求消除;
  • 多路隔离:不同语音流不能共享 KV cache 中的上下文,确认 vLLM 的 prefix caching 不会跨请求串数据;
  • 合规与隐私:语音数据属于高敏感信息,确认端点 VPC 配置、数据加密、日志脱敏策略;
  • 降级方案:GPU 端点故障时,是否有 fallback 到非流式推理或较小模型的备选路径。

实时语音推理不是"把批处理模型套个流式壳"就完事——从音频采集粒度、连接生命周期到 GPU batch 调度,整条链路都要为"边收边推"重新设计。SageMaker AI + vLLM 提供了托管基础设施和流式推理引擎,剩下的架构决策落在你手上。


相关推荐