Agent 前端的流式困境与 VAPD AgentKit 的解法

2026-05-21 28 预计阅读时间:1 分钟
来源:my.oschina.net AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:12 分钟

做 Agent 产品的人大概都踩过同一个坑:后端换了协议,前端就要重写渲染逻辑。OpenAI 用 SSE,Anthropic 用自己的 event stream,自研模型可能直接推 WebSocket frame——每种后端的事件格式、分帧规则、工具调用信令都不一样。前端如果逐个适配,代码很快就会变成一坨协议耦合的意大利面条。

VAPD AgentKit 一期聚焦 Chat 模式,给出的核心思路是:统一消息模型 + Runtime Adapter + 前端编排。只要后端能以流式输出事件,前端就通过 Adapter 转成标准模型,再交给编排层统一渲染。协议无关,面向任意后端流。

下面拆开看这套设计怎么落地。

统一消息模型:先把"说什么"定下来

不同后端对"一条消息"的定义千差万别——有的把 tool_call 嵌在 delta 里,有的单独推一个 event type,有的把中间思考过程塞进 metadata。AgentKit 的做法是先定义一套前端自己的消息模型,所有后端事件进来后都映射到这个模型上。

核心字段大致如下:

// 统一消息模型 —— 前端只认这个结构
interface AgentMessage {
  id: string;
  role: "user" | "assistant" | "tool";
  content: string;           // 文本内容,流式追加
  status: "pending" | "running" | "done" | "error";

  // 工具调用相关
  toolCalls?: ToolCall[];
  toolResult?: string;

  // 元信息:模型名、耗时、token 用量等
  metadata?: Record<string, unknown>;
}

interface ToolCall {
  id: string;
  name: string;              // 工具名,如 "search_web"
  args: string;              // JSON 字符串,流式可能先为空再逐步填充
  result?: string;           // 工具返回结果
  status: "pending" | "running" | "done" | "error";
}

关键设计点:

  • status 字段贯穿始终。一条消息从 pendingrunningdone,前端可以根据状态切换 UI——loading 动画、折叠展开、错误重试,全靠这个字段驱动,不需要额外猜后端"是否结束"。
  • toolCalls 是数组。Agent 一轮对话可能连续调多个工具,每个工具独立追踪状态,前端可以逐个渲染进度卡片。
  • content 支持流式追加。Adapter 层负责把后端的 delta 拼接到 content 上,渲染层只管读最新值。

Runtime Adapter:协议无关的流式翻译器

这是整套架构最关键的一层。Adapter 的职责是:吃进任意格式的后端事件流,吐出标准 AgentMessage 事件

抽象接口大概是这样:

// Adapter 只关心两件事:连接后端、输出标准事件
interface RuntimeAdapter {
  // 发送用户消息,启动一轮 Agent 运行
  start(input: { messages: AgentMessage[] }): void;

  // 前端订阅标准事件流
  onEvent(handler: (event: AgentEvent) => void): void;

  // 中断当前运行(用户取消、超时等)
  abort(): void;

  // 销毁连接
  destroy(): void;
}

// 标准事件 —— 前端编排层只消费这个
type AgentEvent =
  | { type: "message_start"; message: AgentMessage }
  | { type: "content_delta"; messageId: string; delta: string }
  | { type: "tool_call_start"; messageId: string; toolCall: ToolCall }
  | { type: "tool_call_delta"; messageId: string; toolCallId: string; delta: string }
  | { type: "tool_result"; messageId: string; toolCallId: string; result: string }
  | { type: "message_done"; messageId: string }
  | { type: "error"; messageId: string; error: string };

前端编排层永远只监听 AgentEvent,完全不知道后端是 SSE 还是 WebSocket。换后端?换 Adapter 实现,编排层代码一行不改。

下面给一个 SSE 后端的 Adapter 实现示例,可以直接改造使用:

// SSE Adapter —— 对接 OpenAI 兼容的 /chat/completions 流式接口
class SSEAdapter implements RuntimeAdapter {
  private controller: AbortController | null = null;
  private handler: ((event: AgentEvent) => void) | null = null;

  onEvent(handler: (event: AgentEvent) => void): void {
    this.handler = handler;
  }

  async start(input: { messages: AgentMessage[] }): void {
    this.controller = new AbortController();
    const currentMsgId = crypto.randomUUID();

    // 通知编排层:新消息开始
    this.handler?.({
      type: "message_start",
      message: {
        id: currentMsgId,
        role: "assistant",
        content: "",
        status: "running",
      },
    });

    try {
      const resp = await fetch("/api/chat/completions", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
          model: "gpt-4o",
          stream: true,
          messages: input.messages.map((m) => ({
            role: m.role,
            content: m.content,
          })),
        }),
        signal: this.controller.signal,
      });

      // 解析 SSE 流
      const reader = resp.body!.getReader();
      const decoder = new TextDecoder();
      let buffer = "";

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split("\n");
        buffer = lines.pop()!; // 保留未完成的行

        for (const line of lines) {
          if (!line.startsWith("data: ")) continue;
          const raw = line.slice(6);
          if (raw === "[DONE]") {
            this.handler?.({ type: "message_done", messageId: currentMsgId });
            return;
          }

          const chunk = JSON.parse(raw);
          const delta = chunk.choices?.[0]?.delta;

          // 文本增量
          if (delta?.content) {
            this.handler?.({
              type: "content_delta",
              messageId: currentMsgId,
              delta: delta.content,
            });
          }

          // 工具调用增量(OpenAI 格式)
          if (delta?.tool_calls) {
            for (const tc of delta.tool_calls) {
              if (tc.function?.name) {
                this.handler?.({
                  type: "tool_call_start",
                  messageId: currentMsgId,
                  toolCall: {
                    id: tc.id,
                    name: tc.function.name,
                    args: "",
                    status: "running",
                  },
                });
              }
              if (tc.function?.arguments) {
                this.handler?.({
                  type: "tool_call_delta",
                  messageId: currentMsgId,
                  toolCallId: tc.id,
                  delta: tc.function.arguments,
                });
              }
            }
          }
        }
      }
    } catch (err: any) {
      if (err.name !== "AbortError") {
        this.handler?.({
          type: "error",
          messageId: currentMsgId,
          error: err.message,
        });
      }
    }
  }

  abort(): void {
    this.controller?.abort();
  }

  destroy(): void {
    this.abort();
    this.handler = null;
  }
}

如果后端换成 WebSocket,只需要写一个 WSAdapter,内部把 WS frame 映射成 AgentEvent,对外接口完全一致。编排层和渲染组件零改动。

前端编排:把事件流变成可交互的 UI

有了标准事件流,编排层的工作就是维护消息列表状态,并根据事件类型驱动 UI 更新。一个最小可运行的编排器:

// 编排器 —— 维护消息列表,消费 AgentEvent,驱动 UI 重渲染
function useAgentChat(adapter: RuntimeAdapter) {
  const messages = ref<AgentMessage[]>([]);
  const isRunning = ref(false);

  // 订阅事件,更新消息列表
  adapter.onEvent((event) => {
    switch (event.type) {
      case "message_start":
        messages.value.push(event.message);
        isRunning.value = true;
        break;

      case "content_delta":
        const msg = messages.value.find((m) => m.id === event.messageId);
        if (msg) msg.content += event.delta;
        break;

      case "tool_call_start":
        const parent = messages.value.find((m) => m.id === event.messageId);
        if (parent) {
          parent.toolCalls = [...(parent.toolCalls ?? []), event.toolCall];
        }
        break;

      case "tool_call_delta":
        const tc = messages.value
          .find((m) => m.id === event.messageId)
          ?.toolCalls?.find((t) => t.id === event.toolCallId);
        if (tc) tc.args += event.delta;
        break;

      case "tool_result":
        const tool = messages.value
          .find((m) => m.id === event.messageId)
          ?.toolCalls?.find((t) => t.id === event.toolCallId);
        if (tool) {
          tool.result = event.result;
          tool.status = "done";
        }
        break;

      case "message_done":
        const doneMsg = messages.value.find((m) => m.id === event.messageId);
        if (doneMsg) doneMsg.status = "done";
        isRunning.value = false;
        break;

      case "error":
        const errMsg = messages.value.find((m) => m.id === event.messageId);
        if (errMsg) errMsg.status = "error";
        isRunning.value = false;
        break;
    }
  });

  // 发送用户消息
  function send(text: string) {
    const userMsg: AgentMessage = {
      id: crypto.randomUUID(),
      role: "user",
      content: text,
      status: "done",
    };
    messages.value.push(userMsg);
    adapter.start({ messages: messages.value });
  }

  // 取消当前运行
  function cancel() {
    adapter.abort();
    isRunning.value = false;
  }

  return { messages, isRunning, send, cancel };
}

渲染层拿到 messagesisRunning,就可以按角色和状态做差异化渲染:assistant 消息用 Markdown 渲染、tool_call 显示为可折叠的卡片、error 状态加重试按钮。所有这些 UI 决策只依赖 AgentMessage 的字段,和后端协议彻底解耦。

Agent 多轮循环的处理

Agent 不像普通 Chat 那样一问一答就结束。一次用户输入可能触发 assistant 思考 → 调工具 → 拿结果 → 再思考 → 再调工具 → 最终回复,整个循环在前端表现为一条 assistant 消息内嵌多轮 tool_call

AgentKit 的编排方式是:同一个 messageId 下,tool_calls 数组持续追加,每个 ToolCall 独立追踪状态。前端渲染时可以:

  • 按时间线展示每个工具调用的起止和结果
  • 正在运行的工具显示 spinner
  • 完成的工具折叠详情,只展示摘要
  • 最终文本回复正常 Markdown 渲染

这样用户看到的是一个连贯的"思考过程",而不是被工具调用打断的碎片化体验。

落地时需要注意的几个点

  1. Adapter 不是万能的。后端如果根本不支持流式(比如一次性返回完整 JSON),你可以在 Adapter 里用 setTimeout 模拟分帧输出,但这只是体验优化,不是真正的流。真正的流式需要后端配合。

  2. 中断语义要和后端对齐。用户点击取消时,前端调 adapter.abort(),但后端是否真的停了计算?SSE 场景下断开连接后服务端可能还在跑。如果后端支持 /cancel 接口,Adapter 的 abort() 应该同时发取消请求。

  3. tool_call 的 args 流式拼接有坑。OpenAI 的 tool_call arguments 是分多个 delta 过来的 JSON 片段,直接拼接可能在中间状态不是合法 JSON。前端渲染 args 时要做好容错——未完成时显示原始文本,完成后才 parse 成结构化展示。

  4. 消息 ID 的稳定性。流式场景下,同一轮 assistant 消息的所有事件必须共享同一个 messageId。如果后端不提供稳定 ID(有些 SSE 实现每帧都换 ID),Adapter 层需要自己生成并维持,否则编排器会创建多条重复消息。

  5. 多 Agent 场景的扩展。一期只做了 Chat 模式,但 Adapter + 统一模型的架构天然支持扩展——多个 Agent 各有自己的 Adapter 实例,编排层按 agentId 区分事件流,就可以渲染多 Agent 协作的 UI。


VAPD AgentKit 一期的价值不在于它定义了多么复杂的编排能力,而在于它把协议适配UI 编排之间的耦合彻底切断。写一个 Adapter,后端随便换;消费标准事件,渲染随便改。对于正在做多模型、多后端 Agent 产品的团队来说,这个分层思路值得直接拿去用——哪怕不采用 AgentKit 本身,"统一消息模型 + 流式翻译层 + 编排消费层"这个三件套,也能让你的前端代码从协议泥潭里拔出来。


相关推荐