用 MCP + Agentic AI 在代码审查中自动对齐安全需求与实现

2026-06-12 31 预计阅读时间: 1 分钟
来源: dropbox.tech AI 摘要 Original link

Disclaimer: This article is an AI-assisted summary. Read it together with the original source when precision matters. The summary may omit context, version differences, or edge cases and is not official documentation.

预计阅读时间:12 分钟

安全设计文档写得很完整,代码提交时却悄悄偏离——这是很多团队的真实痛点。威胁模型在 Confluence 里沉睡,PR 里的实现早已走了另一条路。Dropbox 近期公开了他们的做法:用 MCP(Model Context Protocol)把 Dash 内部的安全知识喂给 Agentic AI,让 AI 在代码审查阶段主动比对设计意图与实际代码,把"设计-实现"的安全缝隙暴露出来。

设计到代码的缝隙是怎么产生的

典型流程是这样的:架构阶段产出威胁模型和安全需求文档,开发阶段各自写代码,审查阶段靠人眼比对。问题出在最后一步——审查者往往不会逐条对照威胁模型,尤其是当文档分散在不同系统、格式不统一时。

缝隙的具体表现:

  • 威胁模型标注"所有外部输入必须校验长度与类型",但 PR 里直接用了 request.args.get("id") 传进 SQL,没有校验。
  • 安全需求要求"敏感操作需二次认证",实现里只加了 @login_required,没有走 MFA。
  • 文档写了"日志不得包含 PII",代码里 logger.info(f"User {email} logged in") 照样打印邮箱。

这些偏差靠人肉审查很难系统性捕获,因为审查者的注意力在逻辑正确性,不在安全合规性。

Dropbox 的思路:让 AI 主动做比对

Dropbox 的方案核心是三件事:

  1. Dash 作为知识源:Dash 是 Dropbox 的内部搜索与知识平台,里面索引了威胁模型、安全需求文档、架构决策记录等。通过 MCP,AI Agent 可以实时查询这些文档。
  2. MCP 作为桥梁:MCP 让 AI Agent 以标准化方式调用外部工具和数据源。Dash 暴露为 MCP Server,Agent 在审查 PR 时可以拉取相关安全文档。
  3. Agentic AI 做比对:Agent 不是被动等指令,而是主动读取 PR diff → 查询 Dash 中对应的威胁模型 → 比对需求与实现 → 输出缝隙报告。

关键转变:安全审查从"人记得就去查"变成了"AI 每次都查"。

用 MCP 把安全知识接入代码审查

下面用一个可运行的示例展示如何搭建 MCP Server,让 AI Agent 在代码审查时获取威胁模型信息。这个示例基于开源 MCP Python SDK,你可以直接改造接入自己的知识库。

第一步:定义威胁模型数据

假设你的威胁模型以 JSON 结构存储:

// threat_models/api_user_query.json
{
  "component": "UserQueryAPI",
  "threats": [
    {
      "id": "TM-001",
      "description": "SQL injection via unsanitized user input",
      "mitigation": "All user inputs must be validated for type and length; use parameterized queries only",
      "severity": "high"
    },
    {
      "id": "TM-002",
      "description": "PII leakage in logs",
      "mitigation": "Logs must not contain email, phone, or real name; use anonymized identifiers",
      "severity": "medium"
    }
  ],
  "security_requirements": [
    "REQ-SEC-01: Parameterized queries for all DB operations",
    "REQ-SEC-02: PII must be redacted before logging"
  ]
}

第二步:搭建 MCP Server 暴露威胁模型查询

# mcp_threat_server.py
import json
from pathlib import Path
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("threat-model-server")

THREAT_DIR = Path("./threat_models")

@mcp.tool()
def get_threat_model(component: str) -> str:
    """Retrieve the threat model for a given component name.
    Returns the full threat model JSON including threats and security requirements."""
    file_path = THREAT_DIR / f"{component.lower()}.json"
    if not file_path.exists():
        return f"No threat model found for component: {component}"
    return json.dumps(json.loads(file_path.read_text()), indent=2)

@mcp.tool()
def list_components() -> str:
    """List all components that have threat models available."""
    models = [f.stem for f in THREAT_DIR.glob("*.json")]
    return json.dumps(models)

@mcp.tool()
def check_requirement_coverage(component: str, requirement_id: str, pr_diff: str) -> str:
    """Check if a specific security requirement appears to be addressed in the PR diff.
    Returns an analysis of whether the implementation covers the requirement."""
    file_path = THREAT_DIR / f"{component.lower()}.json"
    if not file_path.exists():
        return f"No threat model for {component}"
    model = json.loads(file_path.read_text())
    reqs = model.get("security_requirements", [])
    matched = [r for r in reqs if requirement_id in r]
    if not matched:
        return f"Requirement {requirement_id} not found in threat model for {component}"
    return json.dumps({
        "requirement": matched[0],
        "pr_diff_summary": pr_diff[:500],
        "note": "Agent should analyze whether the diff implements this requirement"
    })

if __name__ == "__main__":
    mcp.run()

运行方式:

# 安装 MCP SDK
pip install mcp

# 启动 Server(默认 stdio 传输)
python mcp_threat_server.py

第三步:在代码审查流程中调用

以下是一个简化的审查 Agent 逻辑,展示如何让 AI 在看到 PR diff 后主动比对威胁模型:

# review_agent.py
import subprocess
import json

def get_pr_diff(pr_number: int) -> str:
    """模拟获取 PR diff——实际中可调用 GitHub API"""
    # 示例:用 gh CLI 获取 diff
    result = subprocess.run(
        ["gh", "pr", "diff", str(pr_number)],
        capture_output=True, text=True
    )
    return result.stdout

def extract_component_from_diff(diff: str) -> str:
    """从 diff 中推断涉及的组件名——简化版"""
    # 实际中可用更智能的路径映射或 LLM 分类
    if "api/user_query" in diff or "user_query" in diff:
        return "UserQueryAPI"
    return "Unknown"

def run_security_gap_check(pr_number: int):
    diff = get_pr_diff(pr_number)
    component = extract_component_from_diff(diff)

    # 通过 MCP Client 查询威胁模型(这里用直接调用模拟)
    # 实际部署中 Agent 通过 MCP 协议与 Server 交互
    threat_model_path = f"./threat_models/{component.lower()}.json"
    try:
        model = json.loads(Path(threat_model_path).read_text())
    except FileNotFoundError:
        print(f"[WARN] No threat model for {component}, skipping security check")
        return

    print(f"=== Security Gap Report for PR #{pr_number} ===")
    print(f"Component: {component}")
    print(f"Threats in model: {len(model['threats'])}")
    print(f"Security requirements: {len(model['security_requirements'])}")
    print()

    # 检查每条安全需求是否在 diff 中有对应实现
    for req in model["security_requirements"]:
        req_keywords = extract_keywords(req)
        found = any(kw in diff.lower() for kw in req_keywords)
        status = "COVERED" if found else "GAP"
        print(f"  [{status}] {req}")
        if not found:
            print(f"         → No evidence in PR diff that this requirement is addressed")

    # 检查威胁是否被缓解
    for threat in model["threats"]:
        mitigation_keywords = extract_keywords(threat["mitigation"])
        found = any(kw in diff.lower() for kw in mitigation_keywords)
        status = "MITIGATED" if found else "UNMITIGATED"
        print(f"  [{status}] {threat['id']}: {threat['description']}")

def extract_keywords(text: str) -> list[str]:
    """从需求/缓解措施中提取关键检测词——简化版"""
    # 实际中可用 LLM 提取更精准的语义关键词
    keywords = []
    for word in text.lower().split():
        if len(word) > 4 and word not in {"must", "should", "all", "only", "before"}:
            keywords.append(word)
    return keywords

# 使用示例
run_security_gap_check(pr_number=42)

输出效果类似:

=== Security Gap Report for PR #42 ===
Component: UserQueryAPI
Threats in model: 2
Security requirements: 2

  [GAP] REQ-SEC-01: Parameterized queries for all DB operations
          No evidence in PR diff that this requirement is addressed
  [GAP] REQ-SEC-02: PII must be redacted before logging
          No evidence in PR diff that this requirement is addressed
  [UNMITIGATED] TM-001: SQL injection via unsanitized user input
  [UNMITIGATED] TM-002: PII leakage in logs

关键词匹配是简化版。真实场景中,Dropbox 用 Agentic AI 做语义级比对——Agent 理解"parameterized queries"意味着代码里应该出现 ? 占位符或 ORM 调用,而不是在 diff 里搜 "parameterized" 这个词。

超越关键词:让 Agent 做语义比对

关键词匹配只能抓表面。真正的价值在于 Agent 能做三层比对:

比对层级 做什么 举例
字面覆盖 需求关键词是否出现在 diff 中 "MFA" 出现在代码注释里
语义覆盖 实现是否真正满足需求意图 需求要 MFA,代码只加了 @login_required——字面有认证,语义不够
缺失发现 需求没提但 diff 暴露了新风险 PR 新增了文件上传接口,威胁模型没覆盖这个场景

Dropbox 的 Agent 重点在第二和第三层。它不只回答"需求有没有被提到",而是判断"实现是否真正达标",以及"这个 PR 是否引入了威胁模型之外的新风险"。

落地时需要面对的现实问题

知识库的覆盖度:MCP 能查到的前提是威胁模型确实写了并存进了 Dash。很多团队的威胁模型覆盖率不到 50%,Agent 查不到就无法比对。起步阶段可以先覆盖核心服务和高风险接口。

比对的误报率:语义比对不是精确匹配,Agent 可能误判。比如把 ORM 的 filter() 误认为没有参数化。需要给 Agent 提供项目特定的安全编码规范作为校准上下文。

Agent 的权限边界:Agent 只做发现和报告,不做阻断。代码审查的最终决定权在人。把 Agent 输出作为 PR comment 而不是 gate,团队接受度更高。

MCP Server 的稳定性:Dash 作为 MCP Server 需要保证查询延迟和可用性。代码审查是实时流程,如果查威胁模型要等 10 秒,审查者会跳过。建议做本地缓存或预加载常用组件的模型。

可以这样起步

  1. 选一个高风险服务,写完整的威胁模型 JSON,放进知识库。
  2. 搭一个最小 MCP Server,只暴露 get_threat_model 一个工具,先用上面的示例代码跑通。
  3. 在 CI 或 PR hook 里接入 Agent,先只做字面覆盖检查,观察一周的输出质量。
  4. 逐步升级到语义比对:接入 LLM,让 Agent 用威胁模型的缓解描述作为 prompt 上下文,分析 diff 是否真正满足要求。
  5. 扩展覆盖范围:从 1 个服务到 N 个,从已有威胁模型到"PR 引入了模型外新风险"的主动发现。

Dropbox 的实践证明了一点:安全缝隙的核心问题不是人不够专业,而是安全知识和代码审查之间的连接太弱。MCP + Agentic AI 做的事情很简单——每次审查代码时,自动把对应的安全要求摆到桌面上。


相关推荐