语音代理、实时字幕、呼叫中心分析、无障碍辅助工具——这些场景的共同底座是实时语音转文字(streaming speech-to-text)。用户一边说话,系统一边出结果,整条链路跑在一条持久连接上,延迟必须压到毫秒级。
传统推理模式在这里直接卡壳:请求-响应架构要求音频全部收齐后才开始推理,等一段 10 秒的录音传完再出结果,体验已经崩了。Amazon SageMaker AI 搭配 vLLM,把流式推理搬上生产环境,让"边收边推"成为可部署的现实。
传统推理为什么撑不住实时语音
经典推理流程是这样的:
- 客户端把完整音频 POST 到服务端;
- 服务端收到全部字节后,送入模型做一次前向传播;
- 返回整段转录文本。
问题出在第 1 步——等待。一段 30 秒的语音,哪怕网络带宽充裕,光"攒数据"就要吃掉几十到几百毫秒;加上模型推理本身的时间,用户感知到的延迟往往超过 1 秒。对于语音代理场景,1 秒的停顿意味着对话节奏断裂,用户会以为系统卡死或没听懂。
更深层的问题是连接模型。HTTP 请求-响应是一次性的,每次转录都要重新建连接、重新传上下文。多轮对话里,前面说了什么全靠客户端自己拼回去,服务端没有状态延续。
vLLM 的流式推理:一条连接,持续推结果
vLLM 是高吞吐 LLM 推理引擎,核心优势之一是原生支持 streaming:模型每生成一个 token(或一小段文本)就立刻推给客户端,不需要等整段输出完成。
把这套机制搬到语音场景,逻辑变成:
- 音频流以 chunk 形式持续推入(比如每 100 ms 一个 chunk);
- 服务端收到 chunk 后立即送入模型做增量推理;
- 转录片段通过同一条 SSE/WebSocket 连接持续回推;
- 客户端拼接片段,呈现"边说边出字"的效果。
关键变化:推理不再等数据收齐,而是跟着数据流滚动推进。
在 SageMaker AI 上部署流式语音推理端点
SageMaker AI 提供了托管推理端点,支持 streaming response。结合 vLLM 的 continuous batching 和 PagedAttention,可以在单端点上同时服务多路语音流,GPU 显存利用率远高于传统逐请求调度。
下面是一个完整的部署示例,把 vLLM 打包成 SageMaker 端点并开启流式响应。
1. 打包模型镜像并推到 ECR
# 假设你已经有一个 vLLM 兼容的语音模型(如 Whisper-large-v3 的 vLLM 适配版)
# 以下使用 vLLM 官方镜像作为基础
# 登录 ECR
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin 763104351884.dkr.ecr.us-east-1.amazonaws.com
# 拉取 vLLM 基础镜像
docker pull vllm/vllm-openai:latest
# 如果需要自定义 entrypoint 或挂载语音预处理逻辑,可在此基础上构建
# 这里直接使用官方镜像,模型权重从 S3 加载
2. 用 SageMaker Python SDK 创建流式推理端点
import sagemaker
from sagemaker.model import Model
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.session.Session()
# 模型权重已上传到 S3
model_s3_path = "s3://my-voice-models/whisper-large-v3-vllm/"
model = Model(
image_uri="vllm/vllm-openai:latest", # 替换为你推到 ECR 的镜像 URI
model_data=model_s3_path,
role=role,
sagemaker_session=sagemaker_session,
env={
# vLLM 启动参数
"MODEL_NAME": "/opt/ml/model", # SageMaker 标准模型挂载路径
"VLLM_SERVE_FROM_S3": "0",
# 开启 streaming
"ENABLE_STREAMING": "1",
# GPU 并行与 batch 配置
"TENSOR_PARALLEL_SIZE": "1",
"MAX_NUM_SEQS": "32", # 同时服务 32 路语音流
"MAX_NUM_BATCHED_TOKENS": "4096",
},
)
# 部署到 ml.g5.12xlarge(4xA10G),根据模型大小调整实例类型
predictor = model.deploy(
initial_instance_count=1,
instance_type="ml.g5.12xlarge",
endpoint_name="voice-streaming-vllm",
)
注意:实际部署中,
image_uri应替换为你推到 AWS ECR 的镜像地址,格式为<account_id>.dkr.ecr.<region>.amazonaws.com/<repo>:<tag>。vLLM 官方镜像不直接在 SageMaker 上运行,需要适配 SageMaker 的/opt/ml/model挂载约定和容器入口脚本。
3. 客户端流式调用示例
import json
import requests
endpoint_url = "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/voice-streaming-vllm/invocations"
# 模拟:将音频分 chunk 流式推送
# 实际场景中,chunk 来自麦克风实时采集
audio_chunks = [
b"<binary_audio_chunk_1>", # 前 100ms 音频
b"<binary_audio_chunk_2>", # 接下来 100ms
b"<binary_audio_chunk_3>",
# ... 持续推送
]
# 使用 SageMaker InvokeEndpointWithResponseStream(流式响应 API)
import boto3
sm_runtime = boto3.client("sagemaker-runtime")
# 发送第一个 chunk,开启流式会话
# 实际生产中需配合 WebSocket 或自定义流式协议
response = sm_runtime.invoke_endpoint(
EndpointName="voice-streaming-vllm",
ContentType="application/json",
Body=json.dumps({
"model": "whisper-large-v3",
"audio": "<base64_encoded_chunk>",
"stream": True,
"language": "zh",
}),
)
# 解析流式返回
result = json.loads(response["Body"].read())
print(result["text"]) # 增量转录片段
# ---- 更完整的流式调用(SageMaker Streaming API)----
# SageMaker 2024 年新增 InvokeEndpointWithResponseStream
stream_response = sm_runtime.invoke_endpoint_with_response_stream(
EndpointName="voice-streaming-vllm",
ContentType="application/json",
Body=json.dumps({
"model": "whisper-large-v3",
"audio": "<base64_encoded_full_or_chunk>",
"stream": True,
}),
)
for event in stream_response["Body"]:
chunk = json.loads(event["PayloadFragment"])
print(chunk.get("text", ""), end="", flush=True)
# 输出效果:文字逐片段出现,像打字机一样
4. 音频预处理:从麦克风到推理端点
实际部署中,音频采集→预处理→推入端点是一条完整管线。以下是一个简化版实时采集 + 推理的客户端骨架:
import pyaudio
import base64
import json
import threading
import queue
CHUNK_DURATION_MS = 100 # 每 100ms 采集一个 chunk
SAMPLE_RATE = 16000
CHUNK_SIZE = int(SAMPLE_RATE * CHUNK_DURATION_MS / 1000)
audio_queue = queue.Queue()
# 音频采集线程
def capture_audio():
pa = pyaudio.PyAudio()
stream = pa.open(
format=pyaudio.paInt16,
channels=1,
rate=SAMPLE_RATE,
input=True,
frames_per_buffer=CHUNK_SIZE,
)
while True:
data = stream.read(CHUNK_SIZE, exception_on_overflow=False)
audio_queue.put(data)
threading.Thread(target=capture_audio, daemon=True).start()
# 推理推送线程——实际生产中应使用 WebSocket 持久连接
# 此处简化为逐 chunk 调用
import boto3
sm_runtime = boto3.client("sagemaker-runtime")
full_transcript = ""
while True:
chunk_data = audio_queue.get()
encoded = base64.b64encode(chunk_data).decode("utf-8")
resp = sm_runtime.invoke_endpoint_with_response_stream(
EndpointName="voice-streaming-vllm",
ContentType="application/json",
Body=json.dumps({
"model": "whisper-large-v3",
"audio_chunk": encoded,
"stream": True,
"is_final": False, # 标记是否为最后一个 chunk
}),
)
for event in resp["Body"]:
fragment = json.loads(event["PayloadFragment"])
if "text" in fragment:
full_transcript += fragment["text"]
print(f"[实时] {full_transcript}")
实践提示:上面的逐 chunk 调用是简化演示。真实生产中,应建立一条 WebSocket/SSE 持久连接,音频 chunk 在同一条连接上持续推送,转录片段在同一条连接上持续回传,避免反复建连的开销。SageMaker 的
InvokeEndpointWithResponseStream支持这种模式。
性能调优的几个关键点
把流式语音推理跑起来只是第一步,要压到生产级延迟,还需要注意以下配置:
| 参数 | 作用 | 建议值 |
|---|---|---|
MAX_NUM_SEQS |
同时在 GPU 上跑的语音流数量 | 16-64,取决于模型大小和 GPU 显存 |
MAX_NUM_BATCHED_TOKENS |
单次 batch 前向传播的最大 token 数 | 4096-8192,越大吞吐越高但单流延迟可能上升 |
CHUNK_DURATION_MS |
客户端音频 chunk 时长 | 50-200ms;太短则推理碎片化,太长则延迟上升 |
TENSOR_PARALLEL_SIZE |
跨 GPU 张量并行度 | 大模型(≥7B)设为实例 GPU 数量 |
一个常见误区是把 MAX_NUM_SEQS 设得过大。vLLM 的 continuous batching 会把所有活跃请求拼进同一个 batch,seqs 过多时单次前向传播变慢,每条流的 token 推出间隔反而拉长。对于语音代理这类延迟敏感场景,建议控制在 16-32 路,优先保单流延迟。
上线前的检查清单
把流式语音推理端点推上生产之前,逐项确认:
- 连接协议:生产环境优先用 WebSocket 或 SSE,不要逐 chunk 发 HTTP 请求;
- 断线重连:语音流中途断连后,客户端需要能从上一个确认的 chunk 位置续传,服务端需要能丢弃半截状态;
- 冷启动:SageMaker 端点首次调用有模型加载延迟(大模型可能 30-60s),用 provisioned concurrency 或预热请求消除;
- 多路隔离:不同语音流不能共享 KV cache 中的上下文,确认 vLLM 的 prefix caching 不会跨请求串数据;
- 合规与隐私:语音数据属于高敏感信息,确认端点 VPC 配置、数据加密、日志脱敏策略;
- 降级方案:GPU 端点故障时,是否有 fallback 到非流式推理或较小模型的备选路径。
实时语音推理不是"把批处理模型套个流式壳"就完事——从音频采集粒度、连接生命周期到 GPU batch 调度,整条链路都要为"边收边推"重新设计。SageMaker AI + vLLM 提供了托管基础设施和流式推理引擎,剩下的架构决策落在你手上。