处理海量文档时,你总会面对一个矛盾:实时场景要求秒级响应,离线场景则更在意成本控制。Amazon Bedrock 同时提供了按需推理(On-demand)和批量推理(Batch)两种调用模式,但多数人只用了其中一种。把两者组合成一条动态切换的管线,才能在延迟和费用之间找到最优解。
两种推理模式的本质差异
Bedrock 的按需推理是同步调用——请求发出后等待模型返回结果,适合单份文档、低延迟场景。批量推理则是异步提交一批任务,Bedrock 在后台调度执行,完成后写入 S3,适合成百上千份文档的集中处理。
关键对比:
| 维度 | 按需推理 | 批量推理 |
|---|---|---|
| 调用方式 | 同步,即时返回 | 异步,提交后轮询/回调 |
| 响应延迟 | 秒级 | 分钟到小时级 |
| 计费模型 | 按请求 token 计费 | 批量单价更低,有折扣 |
| 适用场景 | 单文档实时抽取 | 大规模离线抽取 |
| 错误处理 | 单次失败可立即重试 | 批量失败需重新提交整批 |
核心思路:文档少、要快 → 按需;文档多、要省 → 批量。但真实业务中,两种需求经常交替出现,所以管线需要动态路由。
动态路由的决策逻辑
路由决策至少要考虑三个变量:
- 文档队列深度:当前待处理文档数量是否超过批量阈值。
- 时效要求:业务方是否标注了"紧急"标签。
- 成本预算:当前周期预算是否允许按需调用。
一个简单的路由策略可以这样定义:
def choose_inference_mode(queue_size, urgent_flag, budget_remaining):
"""
根据队列深度、紧急标记和预算余量选择推理模式。
返回 "on_demand" 或 "batch"。
"""
BATCH_THRESHOLD = 10 # 超过 10 份文档才触发批量
if urgent_flag:
# 紧急文档始终走按需,保证速度
return "on_demand"
if queue_size >= BATCH_THRESHOLD and budget_remaining > 0:
# 大批量且有预算 → 批量推理更划算
return "batch"
# 默认走按需,简单可靠
return "on_demand"
实际生产中,阈值和策略参数应该从配置中心读取,方便随时调整。
完整管线实现示例
下面给出一个可改造运行的最小管线,包含按需调用、批量提交与结果回收三个核心环节。
1. 按需推理——单文档实时抽取
import boto3
import json
bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")
MODEL_ID = "anthropic.claude-3-5-sonnet-20241022-v2:0"
def extract_on_demand(document_text: str) -> dict:
"""按需调用 Bedrock,实时抽取文档结构化数据。"""
prompt = f"""
你是一个文档数据抽取引擎。请从以下文档内容中提取:
- 标题
- 日期(如有)
- 关键金额或数值(如有)
- 摘要(不超过 100 字)
请以 JSON 格式输出,字段名用英文。
文档内容:
{document_text}
"""
body = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"messages": [{"role": "user", "content": prompt}],
}
response = bedrock.invoke_model(
modelId=MODEL_ID,
body=json.dumps(body),
contentType="application/json",
accept="application/json",
)
result = json.loads(response["body"].read())
extracted = result["content"][0]["text"]
# 尝试解析为 JSON;若模型输出不严格,做兜底处理
try:
return json.loads(extracted)
except json.JSONDecodeError:
return {"raw_output": extracted}
2. 批量推理——大规模离线抽取
批量推理需要先把输入数据写入 S3,再提交批量任务:
import time
bedrock_batch = boto3.client("bedrock", region_name="us-east-1")
s3 = boto3.client("s3", region_name="us-east-1")
INPUT_BUCKET = "my-doc-pipeline-input"
OUTPUT_BUCKET = "my-doc-pipeline-output"
INPUT_PREFIX = "batch-input/"
OUTPUT_PREFIX = "batch-output/"
def prepare_batch_input(documents: list[dict], batch_id: str) -> str:
"""将多份文档组装为 Bedrock 批量推理所需的 JSONL 文件,上传到 S3。"""
s3_key = f"{INPUT_PREFIX}{batch_id}/input.jsonl"
lines = []
for doc in documents:
record = {
"recordId": doc["id"],
"modelInput": {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": f"从以下文档中抽取标题、日期、金额、摘要,以 JSON 输出:\n{doc['text']}"}
],
},
}
lines.append(json.dumps(record))
s3.put_object(
Bucket=INPUT_BUCKET,
Key=s3_key,
Body="\n".join(lines),
)
return f"s3://{INPUT_BUCKET}/{s3_key}"
def submit_batch_job(input_s3_uri: str, batch_id: str) -> str:
"""提交 Bedrock 批量推理任务,返回 job ARN。"""
output_s3_uri = f"s3://{OUTPUT_BUCKET}/{OUTPUT_PREFIX}{batch_id}/"
response = bedrock_batch.create_model_invocation_job(
jobName=f"doc-extract-{batch_id}",
modelId=MODEL_ID,
inputDataConfig={
"s3InputDataConfig": {"s3Uri": input_s3_uri},
},
outputDataConfig={
"s3OutputDataConfig": {"s3Uri": output_s3_uri},
},
roleArn="arn:aws:iam::123456789012:role/BedrockBatchRole", # 替换为你的角色
)
return response["jobArn"]
def wait_for_batch_job(job_arn: str, timeout: int = 3600) -> str:
"""轮询批量任务状态,直到完成或超时。"""
elapsed = 0
poll_interval = 30
while elapsed < timeout:
status = bedrock_batch.get_model_invocation_job(jobIdentifier=job_arn)
job_status = status["status"]
if job_status == "Completed":
return status["outputDataConfig"]["s3OutputDataConfig"]["s3Uri"]
elif job_status in ("Failed", "Stopped"):
raise RuntimeError(f"Batch job failed: {status['message']}")
time.sleep(poll_interval)
elapsed += poll_interval
raise TimeoutError("Batch job did not complete within timeout")
3. 组装动态管线
把路由决策和两种推理模式串起来:
def process_documents(documents: list[dict], urgent_ids: set[str] = None):
"""
动态管线入口:按路由策略分流文档,
紧急文档走按需,其余走批量。
"""
urgent_ids = urgent_ids or set()
urgent_docs = [d for d in documents if d["id"] in urgent_ids]
normal_docs = [d for d in documents if d["id"] not in urgent_ids]
results = {}
# ---- 紧急文档:按需推理 ----
for doc in urgent_docs:
extracted = extract_on_demand(doc["text"])
results[doc["id"]] = extracted
# ---- 普通文档:批量推理 ----
if len(normal_docs) >= 5: # 这里用 5 作为演示阈值
batch_id = f"batch-{int(time.time())}"
input_uri = prepare_batch_input(normal_docs, batch_id)
job_arn = submit_batch_job(input_uri, batch_id)
output_uri = wait_for_batch_job(job_arn)
# 从 S3 读取批量输出
output_key = f"{OUTPUT_PREFIX}{batch_id}/output.jsonl"
obj = s3.get_object(Bucket=OUTPUT_BUCKET, Key=output_key)
for line in obj["Body"].read().decode().splitlines():
record = json.loads(line)
doc_id = record["recordId"]
model_output = record["modelOutput"]["content"][0]["text"]
try:
results[doc_id] = json.loads(model_output)
except json.JSONDecodeError:
results[doc_id] = {"raw_output": model_output}
elif normal_docs:
# 文档太少,不值得走批量,降级为按需
for doc in normal_docs:
results[doc["id"]] = extract_on_demand(doc["text"])
return results
运行前需要确保:
- IAM 角色
BedrockBatchRole拥有对输入/输出 S3 桶的读写权限以及bedrock:InvokeModel权限。 - S3 桶已创建,且前缀路径可写。
MODEL_ID替换为你实际启用的模型。
选用建议与注意事项
何时优先批量:日处理量超过百份、对延迟容忍度在 10 分钟以上、有明确的成本优化目标。批量推理的单 token 价格通常低于按需,大批量下差异显著。
何时优先按需:用户实时提交单份文档并等待结果、API 网关后端需要同步返回、文档量不稳定且经常只有零星几份。
混合管线的关键风险点:
- 批量任务失败重试成本:整批失败需要重新提交,设计时要考虑分批粒度——每批 50-200 份文档比一次提交 10000 份更安全。
- 输出格式一致性:同一模型在按需和批量下的输出格式理论上一致,但批量场景下模型可能因上下文窗口差异产生微妙的输出变化,建议在管线末端加一层 JSON 校验与修复。
- 冷启动延迟:批量任务从提交到开始执行可能有数分钟等待,紧急文档绝不能走批量。
- S3 路径冲突:不同批次任务的输出路径必须隔离,否则会覆盖结果。
快速检查清单:
- [ ] 按需与批量两种模式是否已在 Bedrock 控制台启用对应模型?
- [ ] IAM 角色是否同时授权了
bedrock-runtime:InvokeModel和bedrock:CreateModelInvocationJob? - [ ] 批量输入 JSONL 的
recordId是否全局唯一? - [ ] 是否设置了批量任务超时与失败告警?
- [ ] 紧急文档路由是否绕过了批量队列?
- [ ] 输出解析是否对 JSON 格式异常做了兜底?
动态切换不是复杂架构,而是对 Bedrock 已有能力的组合利用。把路由阈值配进参数中心,监控两种模式的实际成本与延迟,持续调整阈值——这才是管线真正"弹性"的来源。