给 Cognito 加一层可扩展的用户搜索

2026-06-02 30 预计阅读时间:1 分钟
来源:aws.amazon.com AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:10 分钟

Amazon Cognito 解决了认证和用户池管理的问题,但它的搜索能力非常有限——只能按用户名、邮箱、子 ID 等少数字段做精确匹配,不支持模糊搜索、组合条件筛选或全文检索。当你的应用需要"按姓名拼音搜用户""按注册时间范围+角色标签过滤"这类需求时,Cognito 自身的 ListUsers API 会很快成为瓶颈。

解决方案的思路很直接:把 Cognito 用户数据同步到专门的存储和检索层,用 DynamoDB 做结构化查询,用 OpenSearch 做全文和组合搜索,中间用 Lambda 做增量同步。

架构:从 Cognito 到可搜索的镜像

整体数据流如下:

Cognito User Pool
  │
  │  (PostConfirmation / PreSignUp / Custom Trigger)
  ▼
AWS Lambda (Sync Function)
  │
  ├─► DynamoDB Table    ← 结构化属性、精确查询、排序
  │
  └─► OpenSearch Index  ← 全文搜索、模糊匹配、聚合

关键设计点:

  • 触发时机:Cognito 支持 PostConfirmation(注册确认后)、PostAuthentication(登录后)等 Lambda Trigger。注册和属性修改是同步的主入口;对于已有存量用户,需要做一次全量回扫。
  • 双写而非选一:DynamoDB 擅长按主键或二级索引做精确查找(比如按 tenant_id 拉用户列表),OpenSearch 擅长模糊和组合查询(比如搜"名字含张且角色是 admin")。两者互补,不是替代关系。
  • 增量同步:每次 Cognito 事件触发 Lambda,Lambda 同时写入 DynamoDB 和 OpenSearch。OpenSearch 的 _bulk API 支持批量写入,单次 Lambda 调用可以合并多个事件。

Lambda 同步函数:核心实现

下面是一个可部署的 Lambda 函数,监听 Cognito PostConfirmation Trigger,将用户属性同步到 DynamoDB 和 OpenSearch:

import json
import os
import boto3
import urllib3
from datetime import datetime

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["DYNAMODB_TABLE_NAME"])

OPENSEARCH_ENDPOINT = os.environ["OPENSEARCH_ENDPOINT"]
OPENSEARCH_INDEX = os.environ["OPENSEARCH_INDEX"]
http = urllib3.PoolManager()

def sync_to_dynamodb(user_attrs: dict):
    """写入 DynamoDB,按 tenant_id 建立 gsi 方便按租户查用户"""
    item = {
        "user_id": user_attrs["sub"],
        "tenant_id": user_attrs.get("custom:tenant_id", "default"),
        "email": user_attrs.get("email", ""),
        "name": user_attrs.get("name", ""),
        "role": user_attrs.get("custom:role", "member"),
        "created_at": user_attrs.get("user_create_date", datetime.utcnow().isoformat()),
        "status": user_attrs.get("status", "ACTIVE"),
    }
    table.put_item(Item=item)

def sync_to_opensearch(user_attrs: dict):
    """单条写入 OpenSearch,生产环境可改为 _bulk 批量"""
    doc = {
        "user_id": user_attrs["sub"],
        "tenant_id": user_attrs.get("custom:tenant_id", "default"),
        "email": user_attrs.get("email", ""),
        "name": user_attrs.get("name", ""),
        "role": user_attrs.get("custom:role", "member"),
        "created_at": user_attrs.get("user_create_date", datetime.utcnow().isoformat()),
    }
    url = f"{OPENSEARCH_ENDPOINT}/{OPENSEARCH_INDEX}/_doc/{user_attrs['sub']}"
    # 使用 AWS SigV4 签名请求(生产环境推荐),这里简化为基本认证示例
    credentials = boto3.Session().get_credentials()
    # 实际部署请用 requests-aws4auth 或 opensearch-py 带签名的客户端
    response = http.request(
        "PUT",
        url,
        body=json.dumps(doc),
        headers={"Content-Type": "application/json"},
    )
    if response.status >= 300:
        raise Exception(f"OpenSearch write failed: {response.status} {response.data}")

def lambda_handler(event, context):
    """
    Cognito PostConfirmation Trigger 事件格式:
    event["request"]["userAttributes"] 包含 sub, email, name 等字段
    """
    user_attrs = event["request"]["userAttributes"]

    # 确保有 sub(Cognito 唯一 ID)
    if "sub" not in user_attrs:
        raise ValueError("Missing sub in user attributes")

    sync_to_dynamodb(user_attrs)
    sync_to_opensearch(user_attrs)

    # 返回原事件,Cognito 要求原样返回
    return event

部署前需要修改的地方:

  1. OPENSEARCH_ENDPOINT — 替换为你自己的 OpenSearch 域名(如 https://search-my-cluster.es.amazonaws.com)。
  2. 签名认证 — 生产环境不要用明文凭证,改用 opensearch-py 配合 AWS SigV4 签名,或在 VPC 内用精细访问策略。
  3. 自定义属性custom:tenant_idcustom:role 是 Cognito 自定义属性,需要在 User Pool 里提前声明。

DynamoDB 表设计:精确查询的底座

DynamoDB 表需要支持按租户、按角色等维度快速检索。推荐设计:

# SAM / CloudFormation 片段
MyUserTable:
  Type: AWS::DynamoDB::Table
  Properties:
    TableName: !Ref DynamoDBTableName
    BillingMode: PAY_PER_REQUEST
    AttributeDefinitions:
      - AttributeName: user_id
        AttributeType: S
      - AttributeName: tenant_id
        AttributeType: S
      - AttributeName: created_at
        AttributeType: S
    KeySchema:
      - AttributeName: user_id
        KeyType: HASH
    GlobalSecondaryIndexes:
      - IndexName: tenant-created-index
        KeySchema:
          - AttributeName: tenant_id
            KeyType: HASH
          - AttributeName: created_at
            KeyType: RANGE
        Projection:
          ProjectionType: ALL

这个设计下:

  • 按租户查用户列表:Query tenant-created-index,Hash Key = tenant_id,可按 created_at 范围排序。
  • 按 user_id 精确取单个用户:直接 GetItem 主键。
  • 按角色过滤:角色不是索引键,需要配合 OpenSearch 或在应用层做 FilterExpression。

OpenSearch 索引:模糊与组合搜索

OpenSearch 索引的 mapping 决定搜索质量。name 字段建议用 text 类型配中文分词器(如果用户名含中文),emailrolekeyword 做精确过滤:

PUT /users
{
  "mappings": {
    "properties": {
      "user_id":   { "type": "keyword" },
      "tenant_id": { "type": "keyword" },
      "email":     { "type": "keyword" },
      "name":      { "type": "text", "analyzer": "standard", "fields": {
                     "keyword": { "type": "keyword" }
                   }},
      "role":      { "type": "keyword" },
      "created_at":{ "type": "date" }
    }
  }
}

搜索示例——"名字含'张'、角色是 admin、最近 30 天注册":

GET /users/_search
{
  "query": {
    "bool": {
      "must": [
        { "match": { "name": "张" } }
      ],
      "filter": [
        { "term":  { "role": "admin" } },
        { "range": { "created_at": { "gte": "now-30d/d" } } }
      ]
    }
  }
}

全量回扫:处理存量用户

新用户通过 Trigger 同步,但已有的存量用户不在 Trigger 覆盖范围内。需要写一个一次性 Lambda 或 Step Functions 工作流做全量扫描:

import boto3

cognito = boto3.client("cognito-idp")
USER_POOL_ID = os.environ["USER_POOL_ID"]

def backfill_all_users():
    pagination_token = None
    while True:
        kwargs = {"UserPoolId": USER_POOL_ID, "Limit": 60}
        if pagination_token:
            kwargs["PaginationToken"] = pagination_token

        response = cognito.list_users(**kwargs)
        for user in response["Users"]:
            attrs = {a["Name"]: a["Value"] for a in user["Attributes"]}
            attrs["sub"] = attrs.get("sub", user["Username"])
            attrs["user_create_date"] = user["UserCreateDate"].isoformat()
            attrs["status"] = user["UserStatus"]
            sync_to_dynamodb(attrs)
            sync_to_opensearch(attrs)

        pagination_token = response.get("PaginationToken")
        if not pagination_token:
            break

注意 ListUsers 每次最多返回 60 条,大用户池需要多次分页。建议在非高峰时段执行,并给 Lambda 配足够的超时时间(或改用 Step Functions 做分页循环)。

一致性与延迟的取舍

这个架构有几个需要正视的问题:

  • 最终一致性:Cognito 事件到 DynamoDB/OpenSearch 写入之间有 Lambda 执行延迟(通常几百毫秒到几秒)。刚注册的用户可能短暂无法被搜到。如果业务要求强一致,可以在注册 API 返回前主动调用同步函数,而不是仅依赖 Trigger。
  • 属性变更同步:用户更新资料后,Cognito 的 PreSignUp / PostConfirmation Trigger 不一定覆盖所有变更场景。需要额外监听 PostAuthentication 或在应用层修改属性后主动触发同步。
  • OpenSearch 成本:用户量小时用 OpenSearch 可能显得重。如果全文搜索需求简单,可以先只用 DynamoDB + GSI,等模糊搜索需求明确后再引入 OpenSearch。
  • 删除与软删除:Cognito 删除用户时,Trigger 不会通知。需要在应用层处理:删除用户时同时从 DynamoDB 和 OpenSearch 移除记录,或标记 status: DELETED 做软删除。

上线检查清单

部署前逐项确认:

  • [ ] Cognito User Pool 的自定义属性(custom:tenant_id 等)已声明且可写
  • [ ] Lambda Trigger 已绑定到 PostConfirmation(以及你需要的其他 Trigger)
  • [ ] DynamoDB 表和 GSI 已创建,PAY_PER_REQUEST 模式避免冷启动限流
  • [ ] OpenSearch 域已创建,索引 mapping 已写入,访问策略允许 Lambda 写入
  • [ ] Lambda 执行角色有 dynamodb:PutItem、OpenSearch 域的写入权限、cognito-idp:ListUsers(回扫用)
  • [ ] 全量回扫脚本已测试,存量用户数据已同步
  • [ ] 搜索 API 的超时、分页、错误处理已覆盖
  • [ ] 监控:Lambda 错误率、DynamoDB 写入延迟、OpenSearch 索引健康状态已接入 CloudWatch

这套方案把 Cognito 从"只能按少数字段精确查"的局限里解放出来,同时保留了 Cognito 在认证流程上的核心职责。搜索层独立演进,后续加新索引、新分词策略、新聚合维度,都不需要动 Cognito 本身。


相关推荐