用 Claude 自主发现与修复代码漏洞:Anthropic 开源参考框架拆解

2026-06-05 28 预计阅读时间:1 分钟
来源:oschina.net AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:15 分钟

安全团队做漏洞扫描,流程往往是:跑工具 → 人工审阅海量报告 → 确认真伪 → 写补丁 → 测试。中间两步最耗时,也最容易出错。Anthropic 最近在 GitHub 上开源了 Defending Code Reference Harness(简称 DCRH),把 Claude 嵌进这条流水线,让 Agent 自主完成从漏洞定位到补丁生成的闭环。这不是一个装好就能跑的商业产品,而是一套和安全团队反复迭代后的参考实现——代码、文档、设计决策全开放,目标是让安全研究者与 Agent 开发者拿来改,而不是拿来直接部署。

项目定位:参考实现,不是一键工具

DCRH 的 README 开头就划清了边界:这是一套参考 harness,不是面向终端用户的漏洞扫描器。它的价值在于展示了"如何用 LLM Agent 构建一个可审计、可定制的漏洞发现与修复闭环",而不是提供一个开箱即用的 SAST 替代品。

这意味着几件事:

  • 你需要理解 Agent 的编排逻辑才能有效定制;
  • 项目默认集成的扫描工具(如 Semgrep、CodeQL)只是示例,可以替换成你团队熟悉的工具;
  • Claude 的调用策略、prompt 结构、结果验证流程都暴露在代码中,方便你根据自身代码库的特点调整。

对安全团队来说,这更像是一份"我们试过了、踩坑了、最终沉淀下来的架构方案",而不是一个拿来扫描就出报告的黑盒。

核心架构:三层循环驱动漏洞闭环

DCRH 的 Agent 设计围绕三个核心阶段循环运行:

1. 漏洞发现(Discovery)

Agent 调用外部静态分析工具扫描目标代码库,拿到原始发现列表。这一步的关键不是工具本身,而是 Agent 如何筛选和优先级排序——原始报告里大量是低优先级或误报,Agent 需要根据代码上下文判断哪些值得深入分析。

2. 深度分析(Analysis)

对筛选后的候选漏洞,Agent 逐个深入:读取相关代码文件、追踪数据流、判断可达性。这一步 Claude 的长上下文能力发挥作用——它能同时看漏洞报告和跨文件调用链,判断一个 SQL 注入点是否真的有外部输入可达。

3. 补丁生成与验证(Patch & Verify)

确认漏洞后,Agent 生成修复代码,然后进入验证循环:运行测试套件、重新扫描确认漏洞已消除、检查补丁是否引入新问题。如果验证失败,Agent 会回退并重新生成补丁。

整个流程不是单次直通,而是迭代循环——发现 → 分析 → 补丁 → 验证 → 如果验证失败则回到补丁阶段。这种设计让 Agent 在面对复杂漏洞时不会卡死在第一次尝试上。

Agent 编排的关键设计决策

从项目代码中可以提炼出几个值得借鉴的设计选择:

工具调用与 LLM 推理的分工边界

DCRH 把确定性操作(文件读写、命令执行、工具调用)全部交给工具层,LLM 只负责推理和决策。这避免了 Claude 直接执行命令的风险,也让每一步操作都有明确的审计痕迹。

结构化输出约束

每个阶段 Agent 的输出都有严格的 JSON schema 约束——发现阶段输出漏洞列表及优先级评分,分析阶段输出可达性判断和影响范围,补丁阶段输出具体 diff。这种约束不是装饰,它让下游验证步骤可以程序化地消费上游结果,而不是靠 LLM 的自由文本。

沙箱化执行环境

补丁验证在隔离环境中运行:独立容器、受限网络、只读原始代码库加上补丁层。这防止了 Agent 在验证过程中意外修改生产代码或访问敏感资源。

实践:搭建一个最小化的漏洞发现 Agent

下面用一个可运行的示例展示 DCRH 的核心思路——用 Claude + Semgrep 构建一个单轮漏洞发现 Agent。这个示例比 DCRH 本身简单得多,但包含了关键的结构化输出和工具调用模式。

前提条件: 安装 Semgrep、配置 Anthropic API Key。

# 安装 semgrep
pip install semgrep

# 设置 API Key
export ANTHROPIC_API_KEY="sk-ant-xxxxx"

最小 Agent 实现:

import json
import subprocess
import anthropic

client = anthropic.Anthropic()

# 第一步:用 Semgrep 扫描目标代码库
def run_semgrep(target_path: str) -> list[dict]:
    """运行 Semgrep 扫描,返回结构化发现列表"""
    result = subprocess.run(
        ["semgrep", "--config", "auto", "--json", target_path],
        capture_output=True, text=True
    )
    if result.returncode != 0:
        print(f"Semgrep error: {result.stderr}")
        return []
    scan_data = json.loads(result.stdout)
    findings = []
    for r in scan_data.get("results", []):
        findings.append({
            "rule_id": r["check_id"],
            "message": r["extra"]["message"],
            "severity": r["extra"]["severity"],
            "file": r["path"],
            "line": r["start"]["line"],
            "code_snippet": r["extra"]["lines"],
        })
    return findings

# 第二步:让 Claude 筛选和深度分析
def analyze_findings(findings: list[dict], target_path: str) -> list[dict]:
    """用 Claude 对原始发现进行筛选和可达性分析"""
    # 读取相关源文件,提供给 Claude 作为上下文
    source_context = ""
    analyzed_files = set()
    for f in findings[:10]:  # 限制上下文窗口
        filepath = f["file"]
        if filepath not in analyzed_files:
            try:
                with open(f"{target_path}/{filepath}", "r") as fh:
                    source_context += f"\n--- {filepath} ---\n{fh.read()[:2000]}\n"
                    analyzed_files.add(filepath)
            except FileNotFoundError:
                pass

    prompt = f"""你是一名安全工程师,正在分析以下静态扫描发现。
对每个发现,判断:
1. 是否为真实漏洞(排除明显误报)
2. 可达性:外部输入是否能到达漏洞点
3. 优先级:high/medium/low

原始发现:
{json.dumps(findings, indent=2)}

相关源代码上下文:
{source_context}

请以 JSON 数组格式输出分析结果,每个元素包含:
rule_id, is_real_vulnerability (bool), reachability (reachable/unreachable/uncertain), priority, reasoning"""

    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=4096,
        tools=[{
            "name": "read_file",
            "description": "读取目标代码库中的文件内容",
            "input_schema": {
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "相对于目标路径的文件路径"}
                },
                "required": ["path"]
            }
        }],
        messages=[{"role": "user", "content": prompt}]
    )

    # 从响应中提取 JSON 分析结果
    for block in response.content:
        if block.type == "text":
            try:
                # 尝试从文本中解析 JSON
                text = block.text
                json_start = text.find("[")
                json_end = text.rfind("]") + 1
                if json_start >= 0 and json_end > json_start:
                    return json.loads(text[json_start:json_end])
            except json.JSONDecodeError:
                pass
    return []

# 第三步:对确认的漏洞生成补丁建议
def generate_patch(vuln: dict, source_code: str) -> str:
    """为单个确认漏洞生成修复补丁"""
    prompt = f"""为以下漏洞生成最小化修复补丁,以 unified diff 格式输出。
不要改变功能逻辑,只修复安全问题。

漏洞信息:{json.dumps(vuln, indent=2)}

原始代码:
{source_code}

输出格式:直接输出 diff 内容,不要额外解释。"""

    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=2048,
        messages=[{"role": "user", "content": prompt}]
    )
    for block in response.content:
        if block.type == "text":
            return block.text
    return ""

# 运行完整流程
if __name__ == "__main__":
    target = "./my_project"  # 替换为你的目标代码库路径

    print("=== 阶段 1:Semgrep 扫描 ===")
    raw_findings = run_semgrep(target)
    print(f"原始发现数:{len(raw_findings)}")

    print("\n=== 阶段 2:Claude 深度分析 ===")
    analyzed = analyze_findings(raw_findings, target)
    confirmed = [v for v in analyzed if v.get("is_real_vulnerability")]
    print(f"确认漏洞数:{len(confirmed)}")
    for v in confirmed:
        print(f"  - {v['rule_id']}: priority={v['priority']}, reachability={v['reachability']}")

    print("\n=== 阶段 3:补丁生成 ===")
    for vuln in confirmed[:3]:  # 限制演示数量
        filepath = vuln.get("rule_id", "").split(":")[-1] or "unknown"
        try:
            with open(f"{target}/{filepath}", "r") as f:
                source = f.read()
            patch = generate_patch(vuln, source)
            print(f"\n补丁 for {filepath}:")
            print(patch)
        except FileNotFoundError:
            print(f"无法读取 {filepath},跳过")

运行方式:

# 准备一个有已知漏洞的测试项目
mkdir -p my_project
cat > my_project/app.py << 'EOF'
from flask import Flask, request
import sqlite3

app = Flask(__name__)

@app.route("/search")
def search():
    query = request.args.get("q", "")
    conn = sqlite3.connect("db.sqlite3")
    # SQL 注入漏洞:直接拼接用户输入
    result = conn.execute(f"SELECT * FROM items WHERE name LIKE '%{query}%'").fetchall()
    conn.close()
    return str(result)

if __name__ == "__main__":
    app.run(debug=True)
EOF

# 运行 Agent
python vulnerability_agent.py

这个示例展示了 DCRH 的核心骨架:工具扫描 → LLM 筛选分析 → 补丁生成。DCRH 本身在此基础上增加了迭代验证循环、沙箱执行、多工具编排和更严格的结构化约束,但单轮流程的逻辑是一致的。

定制与接入的实际考量

如果你打算基于 DCRH 构建团队内部的漏洞 Agent,有几个现实问题需要提前想清楚:

扫描工具的选择与集成

DCRH 默认用 Semgrep 和 CodeQL,但你的团队可能已经有成熟的扫描工具链。替换工具层不难——项目把工具调用抽象为统一的 tool interface——但你需要确保工具输出的格式能被 Agent 的筛选逻辑消费。建议先跑一遍默认配置,理解数据流后再替换。

Claude 调用成本的控制

深度分析阶段是调用密度最高的环节——每个候选漏洞都需要读取多个文件、追踪调用链。对于大型代码库,一次完整扫描可能消耗数十美元的 API 费用。DCRH 的优先级筛选机制就是为了在进入深度分析前砍掉大量误报,降低成本。如果你定制时跳过筛选直接全量分析,费用会快速膨胀。

误报容忍度的设定

DCRH 的设计偏向"宁可多报不可漏报"——分析阶段对可达性不确定的漏洞标记为 uncertain 而不是直接排除。这对安全团队是合理的(漏报的代价远高于误报),但如果你的场景是开发者的日常自查,可能需要调低阈值,只输出高置信度的发现,避免报告噪音让开发者忽视。

补丁验证的完整性

DCRH 的验证循环包括重新扫描和测试运行,但不包括人工 code review。在真实部署中,Agent 生成的补丁仍然需要安全工程师最终确认——特别是涉及权限模型、加密逻辑等敏感区域时,LLM 的补丁可能"修了表面、漏了根因"。

什么时候值得投入

DCRH 目前最适合两类团队:

  • 安全研究团队:已经在用 Semgrep/CodeQL 等工具,但人工审阅报告的效率瓶颈明显。DCRH 的筛选和分析阶段可以直接提升审阅效率,即使你暂时不用补丁生成功能。
  • Agent 开发者:正在构建安全领域的 AI Agent,需要参考成熟的编排模式、结构化输出设计和沙箱方案。DCRH 的代码比论文更具体,比 demo 更完整。

对于还没有建立静态分析流程的团队,DCRH 不是合适的起点——先建立基础的扫描和审阅流程,再考虑引入 Agent 自动化。

项目地址在 GitHub 上搜索 anthropic/defending-code-reference-harness 即可找到。建议先通读项目的 design decisions 文档,理解每个架构选择背后的原因,再决定哪些部分值得采纳、哪些需要替换。


相关推荐