很多团队一提到"数据架构",就打开绘图工具画几张 ER 图,然后觉得任务完成了。实际上,从业务系统研发的视角看,数据架构要解决的问题远不止表结构设计——它决定了数据怎么产生、怎么流转、怎么存储、怎么被消费,以及当业务规模翻倍时,系统还能不能扛住。
本文聚焦业务系统研发视角的数据架构。如果站在数据平台的角度,数据架构还会涉及数据标准、数据质量等更广泛的治理领域,但那是另一个话题。
数据架构到底管什么
在业务系统中,数据架构至少要回答四个问题:
数据从哪来? 用户输入、第三方接口、日志采集、定时任务批量导入——不同来源的数据,可靠性、格式、延迟特征完全不同。架构设计需要明确每种来源的接入方式和容错策略。
数据怎么存? 关系型数据库、文档数据库、缓存、对象存储、列式存储——选型不是"哪个技术更先进",而是"哪种存储模型匹配数据的读写特征"。高频写入的流水数据和高频读取的维度数据,存储策略应该不同。
数据怎么流转? 同步还是异步?实时还是批量?数据从一个服务到另一个服务,中间是否需要清洗、转换、聚合?流转路径决定了系统的延迟边界和故障传播范围。
数据怎么被消费? API 查询、报表导出、搜索引擎索引、机器学习特征——消费端的需求直接约束了数据的生产和存储方式。
把这四个问题串起来,就是一条完整的数据生命周期链路。数据架构的本质,是对这条链路做有意识的规划,而不是让每个模块各自为政。
一个常见的反面模式
不少项目是这样演进的:
- 业务初期,所有数据塞进一张大宽表,查询简单,开发快。
- 业务增长,宽表字段膨胀到上百个,写入变慢,索引难以维护。
- 为了"解耦",开始把数据同步到另一个库,但没有统一的同步机制,各团队自己写脚本,数据一致性全靠人肉核对。
- 报表需求来了,又从业务库直接拉数据做聚合,高峰期把业务库拖垮。
这个路径的核心问题:每一步都是局部最优,但整体没有架构约束。数据架构的价值,就是在第一步时就预留演进空间,而不是等到第四步才补窟窿。
实践:用分层模型组织业务数据
一个在业务系统中行之有效的数据架构模式是分层存储——把数据按生命周期和读写特征分成三层:
| 层 | 特征 | 典型存储 | 例子 |
|---|---|---|---|
| 操作层 | 高频写入、实时查询、强一致性 | PostgreSQL / MySQL | 订单、账户、库存 |
| 分析层 | 批量写入、聚合查询、容忍延迟 | ClickHouse / Doris | 日统计、趋势报表 |
| 影像层 | 低频写入、长期保留、归档查询 | 对象存储 + Parquet | 历史订单、合规留存 |
下面用一个 Python 项目结构 + 数据同步配置来展示这种分层怎么落地。
项目结构示例
data-arch-demo/
├── config/
│ ├── sync_pipeline.yaml # 数据同步配置
│ └── storage_routing.yaml # 存储路由规则
├── models/
│ ├── operational.py # 操作层模型定义
│ ├── analytical.py # 分析层模型定义
│ └── archival.py # 影像层模型定义
├── pipelines/
│ ├── op_to_analytical.py # 操作层 → 分析层同步
│ ├── op_to_archival.py # 操作层 → 影像层归档
│ └── quality_check.py # 数据质量校验
└── main.py
存储路由配置
# config/storage_routing.yaml
# 定义不同数据域的存储路由规则
routing:
order:
operational:
engine: postgresql
database: biz_ops
consistency: strong
retention: 90d # 操作层只保留近期数据
analytical:
engine: clickhouse
database: biz_analytics
consistency: eventual
sync_mode: realtime # 实时同步,延迟 < 5s
retention: 365d
archival:
engine: s3_parquet # 对象存储 + Parquet 格式
bucket: data-archive
partition: date # 按日期分区
sync_mode: daily_batch # 每日批量归档
retention: 7y
user_profile:
operational:
engine: postgresql
database: biz_ops
consistency: strong
retention: permanent # 用户资料长期保留在操作层
analytical:
engine: clickhouse
database: biz_analytics
consistency: eventual
sync_mode: hourly_batch
retention: 365d
archival:
skip: true # 用户资料不需要额外归档
这个配置的关键设计意图:
- 操作层保留期短:热数据只留 90 天,避免业务库膨胀。超过 90 天的订单详情从影像层查询。
- 同步模式按需求选:订单数据需要近实时分析,用 realtime;用户画像更新频率低,用 hourly_batch。
- 不是所有数据都要三层:用户资料已经在操作层长期保留,归档层可以跳过。
同步管道代码
# pipelines/op_to_analytical.py
"""操作层 → 分析层:近实时同步管道"""
import time
from datetime import datetime, timedelta
from sqlalchemy import create_engine, text
from clickhouse_driver import Client
# 从配置文件加载,这里用硬编码演示
PG_URL = "postgresql://user:pass@localhost:5432/biz_ops"
CH_URL = "http://localhost:8123"
pg_engine = create_engine(PG_URL)
ch_client = Client(host="localhost", port=9000, database="biz_analytics")
SYNC_WATERMARK = {} # 记录每张表上次同步的位置
def sync_orders(batch_size=5000, lag_seconds=10):
"""将操作层订单增量同步到分析层"""
# 用更新时间作为水位线,避免漏同步
cutoff = datetime.utcnow() - timedelta(seconds=lag_seconds)
last_sync = SYNC_WATERMARK.get("orders", datetime.utcnow() - timedelta(hours=1))
if last_sync >= cutoff:
print("[sync] 无新数据需要同步")
return 0
query = text("""
SELECT order_id, user_id, amount, status, created_at, updated_at
FROM orders
WHERE updated_at > :last_sync AND updated_at <= :cutoff
ORDER BY updated_at
LIMIT :batch_size
""")
with pg_engine.connect() as conn:
rows = conn.execute(query, {
"last_sync": last_sync,
"cutoff": cutoff,
"batch_size": batch_size,
}).fetchall()
if not rows:
SYNC_WATERMARK["orders"] = cutoff
return 0
# 写入 ClickHouse
ch_client.execute(
"INSERT INTO orders_analytical VALUES",
rows # 直接传入 tuple 列表
)
# 推进水位线
SYNC_WATERMARK["orders"] = cutoff
print(f"[sync] 已同步 {len(rows)} 条订单,水位线推进到 {cutoff}")
return len(rows)
if __name__ == "__main__":
# 模拟持续同步循环
while True:
sync_orders()
time.sleep(5) # 每 5 秒轮询一次
运行前需要准备:
- PostgreSQL 中有
orders表,包含order_id, user_id, amount, status, created_at, updated_at字段。 - ClickHouse 中有对应的
orders_analytical表。建表语句:
-- ClickHouse 建表
CREATE TABLE IF NOT EXISTS orders_analytical (
order_id String,
user_id String,
amount Decimal64(2),
status String,
created_at DateTime,
updated_at DateTime
) ENGINE = MergeTree()
ORDER BY (created_at, order_id)
PARTITION BY toYYYYMM(created_at);
- 安装依赖:
pip install sqlalchemy clickhouse-driver psycopg2-binary。
这段代码的核心思路是水位线增量同步——用 updated_at 字段标记上次同步位置,每次只拉取新增和变更的数据,避免全量扫描拖垮业务库。
数据流转的故障边界
分层架构引入了同步管道,也就引入了新的故障点。需要明确几个边界:
同步延迟的容忍度。 操作层写入成功后,分析层多久能看到?5 秒?1 小时?这个数字必须和业务方对齐,否则会出现"报表数据不对"的投诉。
同步失败的处理策略。 网络抖动导致一批数据没同步过去,是重试还是告警后人工介入?建议:自动重试 3 次,失败后写入死信队列,同时触发告警。
数据一致性的校验机制。 定期(比如每天凌晨)对操作层和分析层的关键表做行数 + 聚合值校验,发现偏差及时修复。下面是一个最小化的校验脚本:
# pipelines/quality_check.py
"""每日数据一致性校验"""
from sqlalchemy import create_engine, text
from clickhouse_driver import Client
pg_engine = create_engine("postgresql://user:pass@localhost:5432/biz_ops")
ch_client = Client(host="localhost", port=9000, database="biz_analytics")
def check_orders_count(date_str: str):
"""校验某日订单行数是否一致"""
# 操作层
with pg_engine.connect() as conn:
pg_count = conn.execute(text("""
SELECT COUNT(*) FROM orders
WHERE created_at::date = :date
"""), {"date": date_str}).scalar()
# 分析层
ch_count = ch_client.execute(
"SELECT COUNT(*) FROM orders_analytical WHERE toDate(created_at) = :date",
{"date": date_str}
)[0][0]
diff = pg_count - ch_count
status = "✅ PASS" if diff == 0 else f"❌ FAIL (差 {diff} 条)"
print(f"[校验] {date_str} 订单: PG={pg_count}, CH={ch_count} → {status}")
return diff == 0
if __name__ == "__main__":
from datetime import datetime, timedelta
# 校验昨天的数据
yesterday = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%d")
check_orders_count(yesterday)
架构决策清单
在项目初期做数据架构规划时,可以用下面这张清单逐项确认:
| 决策项 | 要回答的问题 | 常见错误 |
|---|---|---|
| 数据域划分 | 有哪些独立的数据主题? | 所有数据混在一个库 |
| 存储选型 | 每个域的读写特征是什么? | 用 MySQL 扛所有场景 |
| 同步策略 | 哪些数据需要跨层同步?延迟容忍度? | 全量定时同步,无水位线 |
| 保留策略 | 每层数据保留多久?过期怎么清理? | 业务库无限增长 |
| 校验机制 | 怎么发现同步偏差? | 靠用户投诉发现问题 |
| 影像归档 | 历史数据怎么低成本长期留存? | 在业务库做历史查询 |
| 故障预案 | 同步管道挂了怎么办? | 无告警、无重试、无死信队列 |
每一项不需要在第一天就完美回答,但必须有意识地记录当前决策和预期演进方向。架构不是一次性的设计图,而是持续演进的决策日志。
最后一点
数据架构不是数据平台的专属话题。业务系统研发团队如果不做数据架构规划,最终会以更痛苦的方式补上这一课——系统越来越慢、数据越来越乱、报表越来越不准,而修复成本远高于初期规划成本。
从最简单的分层存储和水位线同步开始,哪怕只有操作层和分析层两层,也比所有数据堆在一个库里要好得多。