用按需与批量推理动态切换,构建弹性文档抽取管线

2026-06-12 29 预计阅读时间: 1 分钟
来源: aws.amazon.com AI 摘要 Original link

Disclaimer: This article is an AI-assisted summary. Read it together with the original source when precision matters. The summary may omit context, version differences, or edge cases and is not official documentation.

预计阅读时间:11 分钟

处理海量文档时,你总会面对一个矛盾:实时场景要求秒级响应,离线场景则更在意成本控制。Amazon Bedrock 同时提供了按需推理(On-demand)和批量推理(Batch)两种调用模式,但多数人只用了其中一种。把两者组合成一条动态切换的管线,才能在延迟和费用之间找到最优解。

两种推理模式的本质差异

Bedrock 的按需推理是同步调用——请求发出后等待模型返回结果,适合单份文档、低延迟场景。批量推理则是异步提交一批任务,Bedrock 在后台调度执行,完成后写入 S3,适合成百上千份文档的集中处理。

关键对比:

维度 按需推理 批量推理
调用方式 同步,即时返回 异步,提交后轮询/回调
响应延迟 秒级 分钟到小时级
计费模型 按请求 token 计费 批量单价更低,有折扣
适用场景 单文档实时抽取 大规模离线抽取
错误处理 单次失败可立即重试 批量失败需重新提交整批

核心思路:文档少、要快 → 按需;文档多、要省 → 批量。但真实业务中,两种需求经常交替出现,所以管线需要动态路由。

动态路由的决策逻辑

路由决策至少要考虑三个变量:

  1. 文档队列深度:当前待处理文档数量是否超过批量阈值。
  2. 时效要求:业务方是否标注了"紧急"标签。
  3. 成本预算:当前周期预算是否允许按需调用。

一个简单的路由策略可以这样定义:

def choose_inference_mode(queue_size, urgent_flag, budget_remaining):
    """
    根据队列深度、紧急标记和预算余量选择推理模式。
    返回 "on_demand" 或 "batch"。
    """
    BATCH_THRESHOLD = 10  # 超过 10 份文档才触发批量

    if urgent_flag:
        # 紧急文档始终走按需,保证速度
        return "on_demand"

    if queue_size >= BATCH_THRESHOLD and budget_remaining > 0:
        # 大批量且有预算 → 批量推理更划算
        return "batch"

    # 默认走按需,简单可靠
    return "on_demand"

实际生产中,阈值和策略参数应该从配置中心读取,方便随时调整。

完整管线实现示例

下面给出一个可改造运行的最小管线,包含按需调用、批量提交与结果回收三个核心环节。

1. 按需推理——单文档实时抽取

import boto3
import json

bedrock = boto3.client("bedrock-runtime", region_name="us-east-1")

MODEL_ID = "anthropic.claude-3-5-sonnet-20241022-v2:0"

def extract_on_demand(document_text: str) -> dict:
    """按需调用 Bedrock,实时抽取文档结构化数据。"""

    prompt = f"""
你是一个文档数据抽取引擎。请从以下文档内容中提取:
- 标题
- 日期(如有)
- 关键金额或数值(如有)
- 摘要(不超过 100 字)

请以 JSON 格式输出,字段名用英文。

文档内容:
{document_text}
"""

    body = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1024,
        "messages": [{"role": "user", "content": prompt}],
    }

    response = bedrock.invoke_model(
        modelId=MODEL_ID,
        body=json.dumps(body),
        contentType="application/json",
        accept="application/json",
    )

    result = json.loads(response["body"].read())
    extracted = result["content"][0]["text"]

    # 尝试解析为 JSON;若模型输出不严格,做兜底处理
    try:
        return json.loads(extracted)
    except json.JSONDecodeError:
        return {"raw_output": extracted}

2. 批量推理——大规模离线抽取

批量推理需要先把输入数据写入 S3,再提交批量任务:

import time

bedrock_batch = boto3.client("bedrock", region_name="us-east-1")
s3 = boto3.client("s3", region_name="us-east-1")

INPUT_BUCKET = "my-doc-pipeline-input"
OUTPUT_BUCKET = "my-doc-pipeline-output"
INPUT_PREFIX = "batch-input/"
OUTPUT_PREFIX = "batch-output/"

def prepare_batch_input(documents: list[dict], batch_id: str) -> str:
    """将多份文档组装为 Bedrock 批量推理所需的 JSONL 文件,上传到 S3。"""

    s3_key = f"{INPUT_PREFIX}{batch_id}/input.jsonl"
    lines = []
    for doc in documents:
        record = {
            "recordId": doc["id"],
            "modelInput": {
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 1024,
                "messages": [
                    {"role": "user", "content": f"从以下文档中抽取标题、日期、金额、摘要,以 JSON 输出:\n{doc['text']}"}
                ],
            },
        }
        lines.append(json.dumps(record))

    s3.put_object(
        Bucket=INPUT_BUCKET,
        Key=s3_key,
        Body="\n".join(lines),
    )

    return f"s3://{INPUT_BUCKET}/{s3_key}"


def submit_batch_job(input_s3_uri: str, batch_id: str) -> str:
    """提交 Bedrock 批量推理任务,返回 job ARN。"""

    output_s3_uri = f"s3://{OUTPUT_BUCKET}/{OUTPUT_PREFIX}{batch_id}/"

    response = bedrock_batch.create_model_invocation_job(
        jobName=f"doc-extract-{batch_id}",
        modelId=MODEL_ID,
        inputDataConfig={
            "s3InputDataConfig": {"s3Uri": input_s3_uri},
        },
        outputDataConfig={
            "s3OutputDataConfig": {"s3Uri": output_s3_uri},
        },
        roleArn="arn:aws:iam::123456789012:role/BedrockBatchRole",  # 替换为你的角色
    )

    return response["jobArn"]


def wait_for_batch_job(job_arn: str, timeout: int = 3600) -> str:
    """轮询批量任务状态,直到完成或超时。"""

    elapsed = 0
    poll_interval = 30

    while elapsed < timeout:
        status = bedrock_batch.get_model_invocation_job(jobIdentifier=job_arn)
        job_status = status["status"]

        if job_status == "Completed":
            return status["outputDataConfig"]["s3OutputDataConfig"]["s3Uri"]
        elif job_status in ("Failed", "Stopped"):
            raise RuntimeError(f"Batch job failed: {status['message']}")

        time.sleep(poll_interval)
        elapsed += poll_interval

    raise TimeoutError("Batch job did not complete within timeout")

3. 组装动态管线

把路由决策和两种推理模式串起来:

def process_documents(documents: list[dict], urgent_ids: set[str] = None):
    """
    动态管线入口:按路由策略分流文档,
    紧急文档走按需,其余走批量。
    """
    urgent_ids = urgent_ids or set()
    urgent_docs = [d for d in documents if d["id"] in urgent_ids]
    normal_docs = [d for d in documents if d["id"] not in urgent_ids]

    results = {}

    # ---- 紧急文档:按需推理 ----
    for doc in urgent_docs:
        extracted = extract_on_demand(doc["text"])
        results[doc["id"]] = extracted

    # ---- 普通文档:批量推理 ----
    if len(normal_docs) >= 5:  # 这里用 5 作为演示阈值
        batch_id = f"batch-{int(time.time())}"
        input_uri = prepare_batch_input(normal_docs, batch_id)
        job_arn = submit_batch_job(input_uri, batch_id)
        output_uri = wait_for_batch_job(job_arn)

        # 从 S3 读取批量输出
        output_key = f"{OUTPUT_PREFIX}{batch_id}/output.jsonl"
        obj = s3.get_object(Bucket=OUTPUT_BUCKET, Key=output_key)
        for line in obj["Body"].read().decode().splitlines():
            record = json.loads(line)
            doc_id = record["recordId"]
            model_output = record["modelOutput"]["content"][0]["text"]
            try:
                results[doc_id] = json.loads(model_output)
            except json.JSONDecodeError:
                results[doc_id] = {"raw_output": model_output}

    elif normal_docs:
        # 文档太少,不值得走批量,降级为按需
        for doc in normal_docs:
            results[doc["id"]] = extract_on_demand(doc["text"])

    return results

运行前需要确保:

  • IAM 角色 BedrockBatchRole 拥有对输入/输出 S3 桶的读写权限以及 bedrock:InvokeModel 权限。
  • S3 桶已创建,且前缀路径可写。
  • MODEL_ID 替换为你实际启用的模型。

选用建议与注意事项

何时优先批量:日处理量超过百份、对延迟容忍度在 10 分钟以上、有明确的成本优化目标。批量推理的单 token 价格通常低于按需,大批量下差异显著。

何时优先按需:用户实时提交单份文档并等待结果、API 网关后端需要同步返回、文档量不稳定且经常只有零星几份。

混合管线的关键风险点

  • 批量任务失败重试成本:整批失败需要重新提交,设计时要考虑分批粒度——每批 50-200 份文档比一次提交 10000 份更安全。
  • 输出格式一致性:同一模型在按需和批量下的输出格式理论上一致,但批量场景下模型可能因上下文窗口差异产生微妙的输出变化,建议在管线末端加一层 JSON 校验与修复。
  • 冷启动延迟:批量任务从提交到开始执行可能有数分钟等待,紧急文档绝不能走批量。
  • S3 路径冲突:不同批次任务的输出路径必须隔离,否则会覆盖结果。

快速检查清单

  • [ ] 按需与批量两种模式是否已在 Bedrock 控制台启用对应模型?
  • [ ] IAM 角色是否同时授权了 bedrock-runtime:InvokeModelbedrock:CreateModelInvocationJob
  • [ ] 批量输入 JSONL 的 recordId 是否全局唯一?
  • [ ] 是否设置了批量任务超时与失败告警?
  • [ ] 紧急文档路由是否绕过了批量队列?
  • [ ] 输出解析是否对 JSON 格式异常做了兜底?

动态切换不是复杂架构,而是对 Bedrock 已有能力的组合利用。把路由阈值配进参数中心,监控两种模式的实际成本与延迟,持续调整阈值——这才是管线真正"弹性"的来源。


相关推荐