医院和诊所每年产生大量纸质病历,扫描成 PDF 后仍然无法被系统检索、统计或交换。手动录入耗时且易出错,而传统 OCR 只能提取文字,无法理解临床语义。Amazon Bedrock Data Automation(BDA)的推出改变了这一局面——它不仅能识别文档结构,还能将医疗内容映射为 FHIR R4 资源,再配合 AWS HealthLake 的 FHIR 存储与查询能力,一条无服务器管线就能把扫描件变成可用的结构化数据,部署时间不到 20 分钟。
管线整体架构
整条管线围绕事件驱动、无服务器设计,核心流程如下:
- 上传触发:扫描 PDF 上传至 S3 输入桶,EventBridge 规则捕获
PutObject事件。 - 文档解析:Step Functions 编排工作流,首先调用 Bedrock Data Automation,对 PDF 进行多模态分析——识别表格、段落、关键临床字段,并输出 FHIR R4 JSON Bundle。
- FHIR 入库:解析结果通过 Lambda 函数调用 HealthLake 的 FHIR API(
POST /fhir/r4),将 Bundle 中的每个 Resource 逐条写入 HealthLake 数据存储。 - 结果通知:入库完成后,SNS 发送通知;若任一步骤失败,Step Functions 的 catch 分支写入错误日志并告警。
整条链路中没有常驻服务器,每个服务按调用计费,适合间歇性、批量上传的场景。
各服务的关键角色
Bedrock Data Automation:从图像到 FHIR
BDA 是这条管线的"大脑"。与普通 OCR 不同,它内置了医疗文档的理解能力:
- 识别出院小结、化验报告、处方单等常见文档类型。
- 提取患者姓名、出生日期、诊断编码、用药列表等临床字段。
- 直接输出 FHIR R4 格式的 JSON,包括 Patient、Condition、MedicationRequest 等资源类型。
这意味着你不需要自己写映射逻辑——BDA 已经把"血红蛋白 120g/L"翻译成了 Observation 资源,code 绑定 LOINC,valueQuantity 带单位。
AWS HealthLake:FHIR 数据存储与查询
HealthLake 提供一个完全托管、符合 FHIR R4 标准的数据存储。它的核心能力:
- 原生支持 FHIR CRUD 操作与搜索参数。
- 自动为文本型字段建立索引,支持按患者、日期、诊断等维度检索。
- 内置合规特性(HIPAA eligible),数据加密存储。
管线中 Lambda 调用 HealthLake 时,只需构造标准 FHIR HTTP 请求,无需额外建表或写索引逻辑。
Step Functions:编排与容错
Step Functions 负责串联 BDA 调用与 HealthLake 写入,并处理异常:
- BDA 调用超时或返回异常 → 进入 catch 分支,记录原始 S3 路径与错误信息。
- HealthLake 写入失败 → 重试最多 3 次(指数退避),仍失败则写入 DLQ(Dead Letter Queue)。
- 全部成功 → SNS 通知下游系统(如 BI 工具、临床决策支持系统)。
实战部署:20 分钟上手
下面给出一个可改造的部署方案,使用 AWS CLI + CloudFormation 快速拉起整条管线。假设你已有 AWS 账号且开启了 Bedrock Data Automation 和 HealthLake。
第一步:创建 HealthLake FHIR 数据存储
aws healthlake create-fhir-datastore \
--datastore-name "medical-records-store" \
--datastore-type-version R4 \
--preload-data-config PreloadDataType=SYNTHEA \
--region us-east-1
返回中会包含 DatastoreId,记下来后续使用。预加载 Synthea 示例数据可帮助验证查询,生产环境可去掉 --preload-data-config。
数据存储创建需要约 15 分钟,状态变为 ACTIVE 后即可写入。
第二步:部署管线基础设施
以下 CloudFormation 模板定义了 S3 桶、Step Functions 工作流、Lambda 和 EventBridge 规则的核心骨架。你需要替换 <DATASTORE_ID> 为上一步获得的 ID。
AWSTemplateFormatVersion: '2010-09-09'
Description: Medical record digitization pipeline - BDA + HealthLake
Parameters:
DatastoreId:
Type: String
Description: HealthLake FHIR Datastore ID
Resources:
# S3 input bucket for scanned PDFs
MedicalRecordsBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub 'medical-records-input-${AWS::AccountId}'
VersioningConfiguration:
Status: Enabled
# IAM role for Step Functions workflow
WorkflowRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: states.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaRole
Policies:
- PolicyName: BDAAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- bedrock:InvokeModel
- bedrock-data-automation:*
Resource: '*'
- PolicyName: S3Access
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:GetObject
- s3:PutObject
Resource: !Sub '${MedicalRecordsBucket.Arn}/*'
# Lambda: write FHIR resources to HealthLake
FhirWriterLambda:
Type: AWS::Lambda::Function
Properties:
FunctionName: fhir-writer-to-healthlake
Runtime: python3.12
Handler: index.handler
Role: !GetAtt FhirWriterRole.Arn
Timeout: 30
Environment:
Variables:
DATASTORE_ID: !Ref DatastoreId
REGION: !Ref AWS::Region
Code:
ZipFile: |
import json, os, urllib.request, boto3
DATASTORE_ID = os.environ['DATASTORE_ID']
REGION = os.environ['REGION']
def handler(event, context):
fhir_bundle = json.loads(event['fhir_output'])
endpoint = f"https://healthlake.{REGION}.amazonaws.com/datastore/{DATASTORE_ID}/r4"
results = []
for resource in fhir_bundle.get('entry', []):
rtype = resource['resource']['resourceType']
data = json.dumps(resource['resource']).encode('utf-8')
req = urllib.request.Request(
f"{endpoint}/{rtype}",
data=data,
headers={'Content-Type': 'application/fhir+json'},
method='POST'
)
resp = urllib.request.urlopen(req)
results.append(json.loads(resp.read()))
return {"written_count": len(results), "status": "success"}
FhirWriterRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: HealthLakeWrite
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- healthlake:CreateResource
- healthlake:UpdateResource
Resource: !Sub 'arn:aws:healthlake:${AWS::Region}:${AWS::AccountId}:datastore/${DatastoreId}'
# Step Functions workflow
DigitizationWorkflow:
Type: AWS::StepFunctions::StateMachine
Properties:
StateMachineName: medical-record-digitization
RoleArn: !GetAtt WorkflowRole.Arn
DefinitionString: !Sub |
{
"Comment": "PDF -> BDA -> FHIR -> HealthLake",
"StartAt": "InvokeBDA",
"States": {
"InvokeBDA": {
"Type": "Task",
"Resource": "arn:aws:states:::bedrock:invokeModel",
"Parameters": {
"ModelId": "amazon.bedrock-data-automation-v1",
"Input": {
"S3Uri": "s3://${MedicalRecordsBucket}/{$.object_key}"
},
"OutputFormat": "FHIR_R4"
},
"Next": "WriteToHealthLake",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "LogFailure"
}
]
},
"WriteToHealthLake": {
"Type": "Task",
"Resource": "${FhirWriterLambda.Arn}",
"Parameters": {
"fhir_output.$": "$.Output"
},
"Retry": [
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 5,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "LogFailure"
}
],
"Next": "NotifySuccess"
},
"NotifySuccess": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"Message": "Digitization complete for {$.object_key}",
"TopicArn": "${SuccessTopic}"
},
"End": true
},
"LogFailure": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"Message": "Digitization failed: {$.error}",
"TopicArn": "${FailureTopic}"
},
"End": true
}
}
}
SuccessTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: digitization-success
FailureTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: digitization-failure
# EventBridge rule: S3 PutObject triggers workflow
S3EventRule:
Type: AWS::Events::Rule
Properties:
EventPattern:
source:
- aws.s3
detail-type:
- Object Created
detail:
bucket:
name:
- !Ref MedicalRecordsBucket
object:
key:
- suffix: .pdf
Targets:
- Arn: !Ref DigitizationWorkflow
Id: StartDigitizationWorkflow
RoleArn: !GetAtt EventBridgeRole.Arn
EventBridgeRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: events.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: StartWorkflow
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: states:StartExecution
Resource: !Ref DigitizationWorkflow
部署命令:
# 替换为你的 DatastoreId
DATASTORE_ID=dsl-1234567890abcdef
aws cloudformation deploy \
--template-file pipeline.yaml \
--stack-name medical-digitization \
--parameter-overrides DatastoreId=$DATASTORE_ID \
--capabilities CAPABILITY_IAM \
--region us-east-1
第三步:上传一份扫描病历测试
aws s3 cp sample_discharge_summary.pdf \
s3://medical-records-input-<ACCOUNT_ID>/sample_discharge_summary.pdf
上传后 EventBridge 自动触发工作流。你可以在 Step Functions 控制台查看执行状态,在 HealthLake 中查询结果:
# 查询刚入库的患者资源
DATASTORE_ENDPOINT="https://healthlake.us-east-1.amazonaws.com/datastore/$DATASTORE_ID/r4"
curl -s -X GET "$DATASTORE_ENDPOINT/Patient?name=张" \
-H "Content-Type: application/fhir+json" \
--aws-sigv4 "aws:amz:us-east-1:healthlake" | python3 -m json.tool
注意:上述
curl --aws-sigv4需要 aws-sigv4-proxy 或类似工具辅助签名。生产环境建议通过 Lambda 或 SDK 调用,而非直接 curl。
需要留意的边界与取舍
| 维度 | 说明 |
|---|---|
| 文档类型覆盖 | BDA 目前对常见出院小结、化验单支持较好;非标准格式(手写笔记、非英文文档)的识别率可能下降,需测试验证 |
| FHIR 映射精度 | 自动映射并非 100% 准确,关键字段(诊断编码、药物剂量)建议加入人工复核环节 |
| 成本模型 | BDA 按页计费,HealthLake 按 FHIR 请求与存储量计费。大批量历史数据迁移前,先用小样本测算单页成本 |
| 合规责任 | HealthLake 是 HIPAA eligible,但整条管线的合规仍需你自行评估——S3 桶加密、Lambda 日志脱敏、传输层 TLS 都需要检查 |
| 延迟 | BDA 处理一份 10 页 PDF 通常在数十秒内完成,但 HealthLake 数据存储首次创建需 ~15 分钟 |
上线前的检查清单
- 小样本验证:选 20 份真实扫描件跑一遍管线,人工比对 BDA 输出的 FHIR Bundle 与原文,记录字段遗漏率。
- 异常路径测试:故意上传非医疗 PDF(如发票),确认 catch 分支正确告警,不会写入脏数据。
- 加密配置:确认 S3 桶启用 SSE-KMS,HealthLake 数据存储启用客户托管密钥。
- 日志与审计:开启 CloudTrail 记录所有 HealthLake API 调用;Lambda 日志中避免输出患者敏感字段。
- 重试与 DLQ:确认 Step Functions 的重试策略符合业务容忍度,DLQ 中的失败消息有定期处理机制。
这条管线把"扫描 PDF → 结构化 FHIR 数据"的门槛压到了极低,但医疗数据的准确性要求远高于一般文档。建议先在低风险场景(如历史档案检索)跑通,积累映射准确度的基线数据,再逐步接入临床流程。