用 Microsoft Fabric 和 Azure 数据库搭建 Agentic 应用:统一数据与 AI 的实战路径

2026-06-03 24 预计阅读时间:1 分钟
来源:azure.microsoft.com AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:11 分钟

Agentic 应用正在从"问大模型一个问题"进化到"让 AI 自主规划、调用工具、读写数据并完成多步任务"。但 Agent 要真正可靠地运转,离不开一个能同时支撑分析推理和实时操作的数据底座。Microsoft Build 2026 把这个底座的名字说得很清楚——Microsoft Fabric + Microsoft Databases,一个统一的数据与 AI 平台,让 Agentic 应用从原型走向规模化。

Agentic 应用为什么卡在数据层

大多数 Agent demo 的瓶颈不在模型推理,而在数据访问:

  • 数据散落在多个系统——关系库、文档库、实时流、湖仓各管一摊,Agent 每调一个数据源就要写一套适配代码。
  • 权限与治理割裂——生产数据不能随便让 Agent 写,但开发环境又没有足够真实的数据来验证推理链路。
  • 冷热数据切换成本高——Agent 做历史分析需要海量冷数据,做实时决策又要求毫秒级响应,两套引擎之间来回搬运。

Microsoft Fabric 的思路是把湖仓、实时分析、数据管道、治理和 BI 放进同一个工作区;Microsoft Databases(Cosmos DB、Azure SQL 等)则提供 Agent 运行时需要的低延迟读写和会话状态存储。两者打通后,Agent 不再需要"先 ETL 再推理"。

Fabric + Databases 的关键拼图

Fabric:Agent 的分析大脑

Fabric 内置的 OneLake 是所有工作区共享的单一湖存储,Agent 可以用同一套路径访问 Delta 表、Parquet 文件和实时视图。几个与 Agentic 场景直接相关的组件:

  • Data Pipelines / Dataflows Gen2——Agent 可以触发管道刷新数据,也可以把管道输出作为推理输入。
  • Real-Time Intelligence (Eventhouse / KQL Database)——Agent 做实时监控时,直接用 Kusto 查询拿到最近 N 分钟的指标,不需要额外搭流处理。
  • Copilot in Fabric——Agent 生成 SQL / KQL 查询时可以借助 Fabric 内置的 Copilot 校验语法和语义,降低"幻觉查询"的风险。

Databases:Agent 的操作手和记忆库

  • Azure Cosmos DB——存储 Agent 的会话状态、工具调用日志、用户偏好;NoSQL 模式天然适配半结构化的 Agent 消息流。支持全局分布,Agent 在多区域部署时状态就近读写。
  • Azure SQL Database——Agent 需要事务性写入(比如下单、更新库存)时走 SQL,保证一致性。
  • Azure Database for PostgreSQL / MySQL——已有业务库直接接入,Agent 不需要迁移数据。

统一平台的核心收益:Agent 的分析读路径走 Fabric OneLake,操作写路径走 Azure Databases,治理和权限在同一套 Microsoft Entra ID 体系下管控

实战:用 Semantic Kernel + Cosmos DB 构建一个数据驱动 Agent

下面给出一个最小可运行的 Python 示例——一个能查询 Fabric 湖表历史数据、同时把会话状态写入 Cosmos DB 的 Agent。假设你已经有一个 Azure Cosmos DB (NoSQL API) 实例和一个 Fabric 工作区中的 Delta 表。

环境准备

# 安装依赖
pip install semantic-kernel azure-cosmosdb-python openai

# 设置环境变量(按你的实际资源替换)
export AZURE_OPENAI_ENDPOINT="https://your-openai.openai.azure.com/"
export AZURE_OPENAI_API_KEY="your-key"
export AZURE_OPENAI_DEPLOYMENT="gpt-4.1"
export COSMOS_ENDPOINT="https://your-cosmos.documents.azure.com:443/"
export COSMOS_KEY="your-cosmos-key"
export COSMOS_DATABASE="AgentStateDB"
export COSMOS_CONTAINER="Sessions"

Agent 代码

import os
import json
import uuid
from datetime import datetime

from azure.cosmos import CosmosClient, PartitionKey
from semantic_kernel import Kernel
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.agents import ChatCompletionAgent
from semantic_kernel.functions import kernel_function


# ── 1. Cosmos DB 会话状态存储 ──────────────────────────────
class SessionStore:
    """把 Agent 的每轮对话和工具调用记录写入 Cosmos DB"""

    def __init__(self):
        client = CosmosClient(
            os.environ["COSMOS_ENDPOINT"], os.environ["COSMOS_KEY"]
        )
        db = client.get_database_client(os.environ["COSMOS_DATABASE"])
        # 如果容器不存在就创建
        try:
            db.create_container(
                id=os.environ["COSMOS_CONTAINER"],
                partition_key=PartitionKey(path="/user_id"),
            )
        except Exception:
            pass  # 容器已存在
        self.container = db.get_container_client(os.environ["COSMOS_CONTAINER"])

    def save(self, user_id: str, role: str, content: str, metadata: dict = None):
        doc = {
            "id": str(uuid.uuid4()),
            "user_id": user_id,
            "role": role,
            "content": content,
            "timestamp": datetime.utcnow().isoformat(),
            "metadata": metadata or {},
        }
        self.container.upsert_item(doc)
        return doc["id"]

    def get_recent(self, user_id: str, limit: int = 10):
        query = f"""
        SELECT TOP {limit} c.role, c.content, c.timestamp
        FROM c WHERE c.user_id = '{user_id}'
        ORDER BY c.timestamp DESC
        """
        return list(self.container.query_items(query, enable_cross_partition_query=False))


# ── 2. 模拟 Fabric 数据查询的插件 ────────────────────────────
class FabricDataPlugin:
    """实际生产中,这里会调用 Fabric REST API 或 OneLake Delta 表读取。
    示例用模拟数据演示 Agent 的调用链路。"""

    @kernel_function(
        description="查询最近 N 天的销售汇总数据,返回按日期聚合的营收和订单数"
    )
    def get_sales_summary(self, days: int = 7) -> str:
        # 生产替换:用 pyarrow / delta-python 读 OneLake 中的 Delta 表
        # from deltalake import DeltaTable
        # table = DeltaTable("abfss://workspace@onelake.dfs.core.windows.net/sales/lakehouse/Tables/sales_fact")
        # df = table.to_pandas().tail(days * 24) ...
        mock_data = [
            {"date": "2026-05-19", "revenue": 128400, "orders": 342},
            {"date": "2026-05-18", "revenue": 115200, "orders": 310},
            {"date": "2026-05-17", "revenue": 98700,  "orders": 278},
        ]
        return json.dumps(mock_data, ensure_ascii=False)

    @kernel_function(
        description="查询指定 SKU 的库存余量(从 Azure SQL 业务库读取)"
    )
    def get_inventory(self, sku: str) -> str:
        # 生产替换:用 pyodbc 连 Azure SQL 执行 SELECT
        mock = {"sku": sku, "qty_on_hand": 520, "warehouse": "SH-03"}
        return json.dumps(mock, ensure_ascii=False)


# ── 3. 组装 Agent ────────────────────────────────────────────
def build_agent() -> ChatCompletionAgent:
    kernel = Kernel()
    kernel.add_service(
        AzureChatCompletion(
            service_id="azure-openai",
            deployment_name=os.environ["AZURE_OPENAI_DEPLOYMENT"],
            endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
            api_key=os.environ["AZURE_OPENAI_API_KEY"],
        )
    )
    kernel.add_plugin(FabricDataPlugin(), plugin_name="fabric_data")

    agent = ChatCompletionAgent(
        kernel=kernel,
        name="SalesAnalyst",
        instructions="""你是一个销售分析 Agent。
- 用户问销售趋势时,调用 get_sales_summary 获取数据再分析。
- 用户问库存时,调用 get_inventory 查询具体 SKU。
- 回答要包含数据引用,不要凭空编数字。
- 如果数据不足,明确告诉用户并建议扩大查询范围。""",
    )
    return agent


# ── 4. 运行 ──────────────────────────────────────────────────
async def main():
    store = SessionStore()
    agent = build_agent()
    user_id = "user-001"

    # 加载历史会话作为上下文
    history = store.get_recent(user_id, limit=6)
    context_msgs = [f"{h['role']}: {h['content']}" for h in history]

    questions = [
        "最近 3 天销售情况怎么样?",
        "SKU-A1001 还有多少库存?",
    ]

    for q in questions:
        store.save(user_id, "user", q)
        full_prompt = "\n".join(context_msgs) + f"\nuser: {q}"
        response = await agent.invoke(full_prompt)
        store.save(user_id, "assistant", str(response), metadata={"source": "agent"})
        print(f"Q: {q}\nA: {response}\n---")
        context_msgs.append(f"user: {q}")
        context_msgs.append(f"assistant: {response}")


if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

运行前要改的地方:

  1. 四个环境变量替换成你自己的 Azure OpenAI 和 Cosmos DB 资源。
  2. FabricDataPlugin 里的模拟数据替换成真实的 Fabric REST API 或 Delta 表读取——注释中已给出 deltalake 库的接入方式,OneLake 的 ABFSS 路径格式为 abfss://<workspace>@onelake.dfs.core.windows.net/<lakehouse>/Tables/<table>
  3. 库存查询替换成 pyodbc 连 Azure SQL 的真实 SQL 语句。

这个示例的核心意图不是模拟数据本身,而是展示 Agent → Kernel Plugin → 数据源 的调用链路,以及 每轮对话自动持久化到 Cosmos DB 的状态管理模式。生产部署时,把插件里的数据读取换成真实 API,Agent 的推理逻辑不需要改动。

落地前的考量与清单

把 Agentic 应用从 demo 推到生产,Fabric + Databases 统一平台解决了数据散落问题,但还有几件事要提前想清楚:

维度 要做的事 常见坑
权限 Agent 用 Microsoft Entra ID 的服务主体访问 Fabric 和数据库,按最小权限授 Data Reader / Contributor 给了 Contributor 导致 Agent 能删表
数据边界 明确哪些表 Agent 只读(分析用),哪些可写(状态、日志),写路径走 Cosmos DB 或 SQL 的事务接口 Agent 直接写 OneLake Delta 表,破坏湖仓治理链路
成本 Cosmos DB 按 RU 计费,Agent 高频写会话日志时选 Serverless 或设置 RU 上限;Fabric 容量按 CU 消耗,Agent 触发管道刷新要评估频率 Agent 每轮对话都全量刷新管道,CU 爆表
观测 Agent 的工具调用记录写入 Cosmos DB 同一个容器,用 KQL 或 Power BI 做调用成功率/延迟看板 只看模型输出,不看工具调用失败率
多 Agent 协调 多个 Agent 共享同一 Cosmos DB 容器时,用 agent_id 作为第二分区键避免交叉污染 所有 Agent 写同一个分区,查询时全表扫描

一句话总结: Fabric 给 Agent 提供了统一的分析读路径,Azure Databases 给 Agent 提供了低延迟的操作写路径和状态存储。两条路径在同一套身份和治理体系下运转,Agentic 应用才有可能从"能跑"变成"能规模化跑"。


相关推荐