从扫描 PDF 到 FHIR R4:用 Bedrock Data Automation 与 HealthLake 搭建病历数字化管线

2026-06-10 22 预计阅读时间: 1 分钟
来源: aws.amazon.com AI 摘要 Original link

Disclaimer: This article is an AI-assisted summary. Read it together with the original source when precision matters. The summary may omit context, version differences, or edge cases and is not official documentation.

预计阅读时间:12 分钟

医院和诊所每年产生大量纸质病历,扫描成 PDF 后仍然无法被系统检索、统计或交换。手动录入耗时且易出错,而传统 OCR 只能提取文字,无法理解临床语义。Amazon Bedrock Data Automation(BDA)的推出改变了这一局面——它不仅能识别文档结构,还能将医疗内容映射为 FHIR R4 资源,再配合 AWS HealthLake 的 FHIR 存储与查询能力,一条无服务器管线就能把扫描件变成可用的结构化数据,部署时间不到 20 分钟。

管线整体架构

整条管线围绕事件驱动、无服务器设计,核心流程如下:

  1. 上传触发:扫描 PDF 上传至 S3 输入桶,EventBridge 规则捕获 PutObject 事件。
  2. 文档解析:Step Functions 编排工作流,首先调用 Bedrock Data Automation,对 PDF 进行多模态分析——识别表格、段落、关键临床字段,并输出 FHIR R4 JSON Bundle。
  3. FHIR 入库:解析结果通过 Lambda 函数调用 HealthLake 的 FHIR API(POST /fhir/r4),将 Bundle 中的每个 Resource 逐条写入 HealthLake 数据存储。
  4. 结果通知:入库完成后,SNS 发送通知;若任一步骤失败,Step Functions 的 catch 分支写入错误日志并告警。

整条链路中没有常驻服务器,每个服务按调用计费,适合间歇性、批量上传的场景。

各服务的关键角色

Bedrock Data Automation:从图像到 FHIR

BDA 是这条管线的"大脑"。与普通 OCR 不同,它内置了医疗文档的理解能力:

  • 识别出院小结、化验报告、处方单等常见文档类型。
  • 提取患者姓名、出生日期、诊断编码、用药列表等临床字段。
  • 直接输出 FHIR R4 格式的 JSON,包括 Patient、Condition、MedicationRequest 等资源类型。

这意味着你不需要自己写映射逻辑——BDA 已经把"血红蛋白 120g/L"翻译成了 Observation 资源,code 绑定 LOINC,valueQuantity 带单位。

AWS HealthLake:FHIR 数据存储与查询

HealthLake 提供一个完全托管、符合 FHIR R4 标准的数据存储。它的核心能力:

  • 原生支持 FHIR CRUD 操作与搜索参数。
  • 自动为文本型字段建立索引,支持按患者、日期、诊断等维度检索。
  • 内置合规特性(HIPAA eligible),数据加密存储。

管线中 Lambda 调用 HealthLake 时,只需构造标准 FHIR HTTP 请求,无需额外建表或写索引逻辑。

Step Functions:编排与容错

Step Functions 负责串联 BDA 调用与 HealthLake 写入,并处理异常:

  • BDA 调用超时或返回异常 → 进入 catch 分支,记录原始 S3 路径与错误信息。
  • HealthLake 写入失败 → 重试最多 3 次(指数退避),仍失败则写入 DLQ(Dead Letter Queue)。
  • 全部成功 → SNS 通知下游系统(如 BI 工具、临床决策支持系统)。

实战部署:20 分钟上手

下面给出一个可改造的部署方案,使用 AWS CLI + CloudFormation 快速拉起整条管线。假设你已有 AWS 账号且开启了 Bedrock Data Automation 和 HealthLake。

第一步:创建 HealthLake FHIR 数据存储

aws healthlake create-fhir-datastore \
  --datastore-name "medical-records-store" \
  --datastore-type-version R4 \
  --preload-data-config PreloadDataType=SYNTHEA \
  --region us-east-1

返回中会包含 DatastoreId,记下来后续使用。预加载 Synthea 示例数据可帮助验证查询,生产环境可去掉 --preload-data-config

数据存储创建需要约 15 分钟,状态变为 ACTIVE 后即可写入。

第二步:部署管线基础设施

以下 CloudFormation 模板定义了 S3 桶、Step Functions 工作流、Lambda 和 EventBridge 规则的核心骨架。你需要替换 <DATASTORE_ID> 为上一步获得的 ID。

AWSTemplateFormatVersion: '2010-09-09'
Description: Medical record digitization pipeline - BDA + HealthLake

Parameters:
  DatastoreId:
    Type: String
    Description: HealthLake FHIR Datastore ID

Resources:
  # S3 input bucket for scanned PDFs
  MedicalRecordsBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub 'medical-records-input-${AWS::AccountId}'
      VersioningConfiguration:
        Status: Enabled

  # IAM role for Step Functions workflow
  WorkflowRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: states.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaRole
      Policies:
        - PolicyName: BDAAccess
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - bedrock:InvokeModel
                  - bedrock-data-automation:*
                Resource: '*'
        - PolicyName: S3Access
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - s3:GetObject
                  - s3:PutObject
                Resource: !Sub '${MedicalRecordsBucket.Arn}/*'

  # Lambda: write FHIR resources to HealthLake
  FhirWriterLambda:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: fhir-writer-to-healthlake
      Runtime: python3.12
      Handler: index.handler
      Role: !GetAtt FhirWriterRole.Arn
      Timeout: 30
      Environment:
        Variables:
          DATASTORE_ID: !Ref DatastoreId
          REGION: !Ref AWS::Region
      Code:
        ZipFile: |
          import json, os, urllib.request, boto3

          DATASTORE_ID = os.environ['DATASTORE_ID']
          REGION = os.environ['REGION']

          def handler(event, context):
              fhir_bundle = json.loads(event['fhir_output'])
              endpoint = f"https://healthlake.{REGION}.amazonaws.com/datastore/{DATASTORE_ID}/r4"
              results = []
              for resource in fhir_bundle.get('entry', []):
                  rtype = resource['resource']['resourceType']
                  data = json.dumps(resource['resource']).encode('utf-8')
                  req = urllib.request.Request(
                      f"{endpoint}/{rtype}",
                      data=data,
                      headers={'Content-Type': 'application/fhir+json'},
                      method='POST'
                  )
                  resp = urllib.request.urlopen(req)
                  results.append(json.loads(resp.read()))
              return {"written_count": len(results), "status": "success"}

  FhirWriterRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: HealthLakeWrite
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - healthlake:CreateResource
                  - healthlake:UpdateResource
                Resource: !Sub 'arn:aws:healthlake:${AWS::Region}:${AWS::AccountId}:datastore/${DatastoreId}'

  # Step Functions workflow
  DigitizationWorkflow:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      StateMachineName: medical-record-digitization
      RoleArn: !GetAtt WorkflowRole.Arn
      DefinitionString: !Sub |
        {
          "Comment": "PDF -> BDA -> FHIR -> HealthLake",
          "StartAt": "InvokeBDA",
          "States": {
            "InvokeBDA": {
              "Type": "Task",
              "Resource": "arn:aws:states:::bedrock:invokeModel",
              "Parameters": {
                "ModelId": "amazon.bedrock-data-automation-v1",
                "Input": {
                  "S3Uri": "s3://${MedicalRecordsBucket}/{$.object_key}"
                },
                "OutputFormat": "FHIR_R4"
              },
              "Next": "WriteToHealthLake",
              "Catch": [
                {
                  "ErrorEquals": ["States.ALL"],
                  "ResultPath": "$.error",
                  "Next": "LogFailure"
                }
              ]
            },
            "WriteToHealthLake": {
              "Type": "Task",
              "Resource": "${FhirWriterLambda.Arn}",
              "Parameters": {
                "fhir_output.$": "$.Output"
              },
              "Retry": [
                {
                  "ErrorEquals": ["States.ALL"],
                  "IntervalSeconds": 5,
                  "MaxAttempts": 3,
                  "BackoffRate": 2.0
                }
              ],
              "Catch": [
                {
                  "ErrorEquals": ["States.ALL"],
                  "ResultPath": "$.error",
                  "Next": "LogFailure"
                }
              ],
              "Next": "NotifySuccess"
            },
            "NotifySuccess": {
              "Type": "Task",
              "Resource": "arn:aws:states:::sns:publish",
              "Parameters": {
                "Message": "Digitization complete for {$.object_key}",
                "TopicArn": "${SuccessTopic}"
              },
              "End": true
            },
            "LogFailure": {
              "Type": "Task",
              "Resource": "arn:aws:states:::sns:publish",
              "Parameters": {
                "Message": "Digitization failed: {$.error}",
                "TopicArn": "${FailureTopic}"
              },
              "End": true
            }
          }
        }

  SuccessTopic:
    Type: AWS::SNS::Topic
    Properties:
      TopicName: digitization-success

  FailureTopic:
    Type: AWS::SNS::Topic
    Properties:
      TopicName: digitization-failure

  # EventBridge rule: S3 PutObject triggers workflow
  S3EventRule:
    Type: AWS::Events::Rule
    Properties:
      EventPattern:
        source:
          - aws.s3
        detail-type:
          - Object Created
        detail:
          bucket:
            name:
              - !Ref MedicalRecordsBucket
          object:
            key:
              - suffix: .pdf
      Targets:
        - Arn: !Ref DigitizationWorkflow
          Id: StartDigitizationWorkflow
          RoleArn: !GetAtt EventBridgeRole.Arn

  EventBridgeRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: events.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyName: StartWorkflow
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action: states:StartExecution
                Resource: !Ref DigitizationWorkflow

部署命令:

# 替换为你的 DatastoreId
DATASTORE_ID=dsl-1234567890abcdef

aws cloudformation deploy \
  --template-file pipeline.yaml \
  --stack-name medical-digitization \
  --parameter-overrides DatastoreId=$DATASTORE_ID \
  --capabilities CAPABILITY_IAM \
  --region us-east-1

第三步:上传一份扫描病历测试

aws s3 cp sample_discharge_summary.pdf \
  s3://medical-records-input-<ACCOUNT_ID>/sample_discharge_summary.pdf

上传后 EventBridge 自动触发工作流。你可以在 Step Functions 控制台查看执行状态,在 HealthLake 中查询结果:

# 查询刚入库的患者资源
DATASTORE_ENDPOINT="https://healthlake.us-east-1.amazonaws.com/datastore/$DATASTORE_ID/r4"

curl -s -X GET "$DATASTORE_ENDPOINT/Patient?name=张" \
  -H "Content-Type: application/fhir+json" \
  --aws-sigv4 "aws:amz:us-east-1:healthlake" | python3 -m json.tool

注意:上述 curl --aws-sigv4 需要 aws-sigv4-proxy 或类似工具辅助签名。生产环境建议通过 Lambda 或 SDK 调用,而非直接 curl。

需要留意的边界与取舍

维度 说明
文档类型覆盖 BDA 目前对常见出院小结、化验单支持较好;非标准格式(手写笔记、非英文文档)的识别率可能下降,需测试验证
FHIR 映射精度 自动映射并非 100% 准确,关键字段(诊断编码、药物剂量)建议加入人工复核环节
成本模型 BDA 按页计费,HealthLake 按 FHIR 请求与存储量计费。大批量历史数据迁移前,先用小样本测算单页成本
合规责任 HealthLake 是 HIPAA eligible,但整条管线的合规仍需你自行评估——S3 桶加密、Lambda 日志脱敏、传输层 TLS 都需要检查
延迟 BDA 处理一份 10 页 PDF 通常在数十秒内完成,但 HealthLake 数据存储首次创建需 ~15 分钟

上线前的检查清单

  1. 小样本验证:选 20 份真实扫描件跑一遍管线,人工比对 BDA 输出的 FHIR Bundle 与原文,记录字段遗漏率。
  2. 异常路径测试:故意上传非医疗 PDF(如发票),确认 catch 分支正确告警,不会写入脏数据。
  3. 加密配置:确认 S3 桶启用 SSE-KMS,HealthLake 数据存储启用客户托管密钥。
  4. 日志与审计:开启 CloudTrail 记录所有 HealthLake API 调用;Lambda 日志中避免输出患者敏感字段。
  5. 重试与 DLQ:确认 Step Functions 的重试策略符合业务容忍度,DLQ 中的失败消息有定期处理机制。

这条管线把"扫描 PDF → 结构化 FHIR 数据"的门槛压到了极低,但医疗数据的准确性要求远高于一般文档。建议先在低风险场景(如历史档案检索)跑通,积累映射准确度的基线数据,再逐步接入临床流程。


相关推荐