Agentic 应用正在从"问大模型一个问题"进化到"让 AI 自主规划、调用工具、读写数据并完成多步任务"。但 Agent 要真正可靠地运转,离不开一个能同时支撑分析推理和实时操作的数据底座。Microsoft Build 2026 把这个底座的名字说得很清楚——Microsoft Fabric + Microsoft Databases,一个统一的数据与 AI 平台,让 Agentic 应用从原型走向规模化。
Agentic 应用为什么卡在数据层
大多数 Agent demo 的瓶颈不在模型推理,而在数据访问:
- 数据散落在多个系统——关系库、文档库、实时流、湖仓各管一摊,Agent 每调一个数据源就要写一套适配代码。
- 权限与治理割裂——生产数据不能随便让 Agent 写,但开发环境又没有足够真实的数据来验证推理链路。
- 冷热数据切换成本高——Agent 做历史分析需要海量冷数据,做实时决策又要求毫秒级响应,两套引擎之间来回搬运。
Microsoft Fabric 的思路是把湖仓、实时分析、数据管道、治理和 BI 放进同一个工作区;Microsoft Databases(Cosmos DB、Azure SQL 等)则提供 Agent 运行时需要的低延迟读写和会话状态存储。两者打通后,Agent 不再需要"先 ETL 再推理"。
Fabric + Databases 的关键拼图
Fabric:Agent 的分析大脑
Fabric 内置的 OneLake 是所有工作区共享的单一湖存储,Agent 可以用同一套路径访问 Delta 表、Parquet 文件和实时视图。几个与 Agentic 场景直接相关的组件:
- Data Pipelines / Dataflows Gen2——Agent 可以触发管道刷新数据,也可以把管道输出作为推理输入。
- Real-Time Intelligence (Eventhouse / KQL Database)——Agent 做实时监控时,直接用 Kusto 查询拿到最近 N 分钟的指标,不需要额外搭流处理。
- Copilot in Fabric——Agent 生成 SQL / KQL 查询时可以借助 Fabric 内置的 Copilot 校验语法和语义,降低"幻觉查询"的风险。
Databases:Agent 的操作手和记忆库
- Azure Cosmos DB——存储 Agent 的会话状态、工具调用日志、用户偏好;NoSQL 模式天然适配半结构化的 Agent 消息流。支持全局分布,Agent 在多区域部署时状态就近读写。
- Azure SQL Database——Agent 需要事务性写入(比如下单、更新库存)时走 SQL,保证一致性。
- Azure Database for PostgreSQL / MySQL——已有业务库直接接入,Agent 不需要迁移数据。
统一平台的核心收益:Agent 的分析读路径走 Fabric OneLake,操作写路径走 Azure Databases,治理和权限在同一套 Microsoft Entra ID 体系下管控。
实战:用 Semantic Kernel + Cosmos DB 构建一个数据驱动 Agent
下面给出一个最小可运行的 Python 示例——一个能查询 Fabric 湖表历史数据、同时把会话状态写入 Cosmos DB 的 Agent。假设你已经有一个 Azure Cosmos DB (NoSQL API) 实例和一个 Fabric 工作区中的 Delta 表。
环境准备
# 安装依赖
pip install semantic-kernel azure-cosmosdb-python openai
# 设置环境变量(按你的实际资源替换)
export AZURE_OPENAI_ENDPOINT="https://your-openai.openai.azure.com/"
export AZURE_OPENAI_API_KEY="your-key"
export AZURE_OPENAI_DEPLOYMENT="gpt-4.1"
export COSMOS_ENDPOINT="https://your-cosmos.documents.azure.com:443/"
export COSMOS_KEY="your-cosmos-key"
export COSMOS_DATABASE="AgentStateDB"
export COSMOS_CONTAINER="Sessions"
Agent 代码
import os
import json
import uuid
from datetime import datetime
from azure.cosmos import CosmosClient, PartitionKey
from semantic_kernel import Kernel
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.agents import ChatCompletionAgent
from semantic_kernel.functions import kernel_function
# ── 1. Cosmos DB 会话状态存储 ──────────────────────────────
class SessionStore:
"""把 Agent 的每轮对话和工具调用记录写入 Cosmos DB"""
def __init__(self):
client = CosmosClient(
os.environ["COSMOS_ENDPOINT"], os.environ["COSMOS_KEY"]
)
db = client.get_database_client(os.environ["COSMOS_DATABASE"])
# 如果容器不存在就创建
try:
db.create_container(
id=os.environ["COSMOS_CONTAINER"],
partition_key=PartitionKey(path="/user_id"),
)
except Exception:
pass # 容器已存在
self.container = db.get_container_client(os.environ["COSMOS_CONTAINER"])
def save(self, user_id: str, role: str, content: str, metadata: dict = None):
doc = {
"id": str(uuid.uuid4()),
"user_id": user_id,
"role": role,
"content": content,
"timestamp": datetime.utcnow().isoformat(),
"metadata": metadata or {},
}
self.container.upsert_item(doc)
return doc["id"]
def get_recent(self, user_id: str, limit: int = 10):
query = f"""
SELECT TOP {limit} c.role, c.content, c.timestamp
FROM c WHERE c.user_id = '{user_id}'
ORDER BY c.timestamp DESC
"""
return list(self.container.query_items(query, enable_cross_partition_query=False))
# ── 2. 模拟 Fabric 数据查询的插件 ────────────────────────────
class FabricDataPlugin:
"""实际生产中,这里会调用 Fabric REST API 或 OneLake Delta 表读取。
示例用模拟数据演示 Agent 的调用链路。"""
@kernel_function(
description="查询最近 N 天的销售汇总数据,返回按日期聚合的营收和订单数"
)
def get_sales_summary(self, days: int = 7) -> str:
# 生产替换:用 pyarrow / delta-python 读 OneLake 中的 Delta 表
# from deltalake import DeltaTable
# table = DeltaTable("abfss://workspace@onelake.dfs.core.windows.net/sales/lakehouse/Tables/sales_fact")
# df = table.to_pandas().tail(days * 24) ...
mock_data = [
{"date": "2026-05-19", "revenue": 128400, "orders": 342},
{"date": "2026-05-18", "revenue": 115200, "orders": 310},
{"date": "2026-05-17", "revenue": 98700, "orders": 278},
]
return json.dumps(mock_data, ensure_ascii=False)
@kernel_function(
description="查询指定 SKU 的库存余量(从 Azure SQL 业务库读取)"
)
def get_inventory(self, sku: str) -> str:
# 生产替换:用 pyodbc 连 Azure SQL 执行 SELECT
mock = {"sku": sku, "qty_on_hand": 520, "warehouse": "SH-03"}
return json.dumps(mock, ensure_ascii=False)
# ── 3. 组装 Agent ────────────────────────────────────────────
def build_agent() -> ChatCompletionAgent:
kernel = Kernel()
kernel.add_service(
AzureChatCompletion(
service_id="azure-openai",
deployment_name=os.environ["AZURE_OPENAI_DEPLOYMENT"],
endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_key=os.environ["AZURE_OPENAI_API_KEY"],
)
)
kernel.add_plugin(FabricDataPlugin(), plugin_name="fabric_data")
agent = ChatCompletionAgent(
kernel=kernel,
name="SalesAnalyst",
instructions="""你是一个销售分析 Agent。
- 用户问销售趋势时,调用 get_sales_summary 获取数据再分析。
- 用户问库存时,调用 get_inventory 查询具体 SKU。
- 回答要包含数据引用,不要凭空编数字。
- 如果数据不足,明确告诉用户并建议扩大查询范围。""",
)
return agent
# ── 4. 运行 ──────────────────────────────────────────────────
async def main():
store = SessionStore()
agent = build_agent()
user_id = "user-001"
# 加载历史会话作为上下文
history = store.get_recent(user_id, limit=6)
context_msgs = [f"{h['role']}: {h['content']}" for h in history]
questions = [
"最近 3 天销售情况怎么样?",
"SKU-A1001 还有多少库存?",
]
for q in questions:
store.save(user_id, "user", q)
full_prompt = "\n".join(context_msgs) + f"\nuser: {q}"
response = await agent.invoke(full_prompt)
store.save(user_id, "assistant", str(response), metadata={"source": "agent"})
print(f"Q: {q}\nA: {response}\n---")
context_msgs.append(f"user: {q}")
context_msgs.append(f"assistant: {response}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
运行前要改的地方:
- 四个环境变量替换成你自己的 Azure OpenAI 和 Cosmos DB 资源。
FabricDataPlugin里的模拟数据替换成真实的 Fabric REST API 或 Delta 表读取——注释中已给出deltalake库的接入方式,OneLake 的 ABFSS 路径格式为abfss://<workspace>@onelake.dfs.core.windows.net/<lakehouse>/Tables/<table>。- 库存查询替换成
pyodbc连 Azure SQL 的真实 SQL 语句。
这个示例的核心意图不是模拟数据本身,而是展示 Agent → Kernel Plugin → 数据源 的调用链路,以及 每轮对话自动持久化到 Cosmos DB 的状态管理模式。生产部署时,把插件里的数据读取换成真实 API,Agent 的推理逻辑不需要改动。
落地前的考量与清单
把 Agentic 应用从 demo 推到生产,Fabric + Databases 统一平台解决了数据散落问题,但还有几件事要提前想清楚:
| 维度 | 要做的事 | 常见坑 |
|---|---|---|
| 权限 | Agent 用 Microsoft Entra ID 的服务主体访问 Fabric 和数据库,按最小权限授 Data Reader / Contributor | 给了 Contributor 导致 Agent 能删表 |
| 数据边界 | 明确哪些表 Agent 只读(分析用),哪些可写(状态、日志),写路径走 Cosmos DB 或 SQL 的事务接口 | Agent 直接写 OneLake Delta 表,破坏湖仓治理链路 |
| 成本 | Cosmos DB 按 RU 计费,Agent 高频写会话日志时选 Serverless 或设置 RU 上限;Fabric 容量按 CU 消耗,Agent 触发管道刷新要评估频率 | Agent 每轮对话都全量刷新管道,CU 爆表 |
| 观测 | Agent 的工具调用记录写入 Cosmos DB 同一个容器,用 KQL 或 Power BI 做调用成功率/延迟看板 | 只看模型输出,不看工具调用失败率 |
| 多 Agent 协调 | 多个 Agent 共享同一 Cosmos DB 容器时,用 agent_id 作为第二分区键避免交叉污染 |
所有 Agent 写同一个分区,查询时全表扫描 |
一句话总结: Fabric 给 Agent 提供了统一的分析读路径,Azure Databases 给 Agent 提供了低延迟的操作写路径和状态存储。两条路径在同一套身份和治理体系下运转,Agentic 应用才有可能从"能跑"变成"能规模化跑"。