业务系统里的数据架构:不只是画 ER 图

2026-05-18 38 预计阅读时间:1 分钟
来源:my.oschina.net AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:13 分钟

很多团队一提到"数据架构",就打开绘图工具画几张 ER 图,然后觉得任务完成了。实际上,从业务系统研发的视角看,数据架构要解决的问题远不止表结构设计——它决定了数据怎么产生、怎么流转、怎么存储、怎么被消费,以及当业务规模翻倍时,系统还能不能扛住。

本文聚焦业务系统研发视角的数据架构。如果站在数据平台的角度,数据架构还会涉及数据标准、数据质量等更广泛的治理领域,但那是另一个话题。

数据架构到底管什么

在业务系统中,数据架构至少要回答四个问题:

数据从哪来? 用户输入、第三方接口、日志采集、定时任务批量导入——不同来源的数据,可靠性、格式、延迟特征完全不同。架构设计需要明确每种来源的接入方式和容错策略。

数据怎么存? 关系型数据库、文档数据库、缓存、对象存储、列式存储——选型不是"哪个技术更先进",而是"哪种存储模型匹配数据的读写特征"。高频写入的流水数据和高频读取的维度数据,存储策略应该不同。

数据怎么流转? 同步还是异步?实时还是批量?数据从一个服务到另一个服务,中间是否需要清洗、转换、聚合?流转路径决定了系统的延迟边界和故障传播范围。

数据怎么被消费? API 查询、报表导出、搜索引擎索引、机器学习特征——消费端的需求直接约束了数据的生产和存储方式。

把这四个问题串起来,就是一条完整的数据生命周期链路。数据架构的本质,是对这条链路做有意识的规划,而不是让每个模块各自为政。

一个常见的反面模式

不少项目是这样演进的:

  1. 业务初期,所有数据塞进一张大宽表,查询简单,开发快。
  2. 业务增长,宽表字段膨胀到上百个,写入变慢,索引难以维护。
  3. 为了"解耦",开始把数据同步到另一个库,但没有统一的同步机制,各团队自己写脚本,数据一致性全靠人肉核对。
  4. 报表需求来了,又从业务库直接拉数据做聚合,高峰期把业务库拖垮。

这个路径的核心问题:每一步都是局部最优,但整体没有架构约束。数据架构的价值,就是在第一步时就预留演进空间,而不是等到第四步才补窟窿。

实践:用分层模型组织业务数据

一个在业务系统中行之有效的数据架构模式是分层存储——把数据按生命周期和读写特征分成三层:

特征 典型存储 例子
操作层 高频写入、实时查询、强一致性 PostgreSQL / MySQL 订单、账户、库存
分析层 批量写入、聚合查询、容忍延迟 ClickHouse / Doris 日统计、趋势报表
影像层 低频写入、长期保留、归档查询 对象存储 + Parquet 历史订单、合规留存

下面用一个 Python 项目结构 + 数据同步配置来展示这种分层怎么落地。

项目结构示例

data-arch-demo/
├── config/
│   ├── sync_pipeline.yaml      # 数据同步配置
│   └── storage_routing.yaml    # 存储路由规则
├── models/
│   ├── operational.py          # 操作层模型定义
│   ├── analytical.py           # 分析层模型定义
│   └── archival.py             # 影像层模型定义
├── pipelines/
│   ├── op_to_analytical.py     # 操作层 → 分析层同步
│   ├── op_to_archival.py       # 操作层 → 影像层归档
│   └── quality_check.py        # 数据质量校验
└── main.py

存储路由配置

# config/storage_routing.yaml
# 定义不同数据域的存储路由规则
routing:
  order:
    operational:
      engine: postgresql
      database: biz_ops
      consistency: strong
      retention: 90d          # 操作层只保留近期数据
    analytical:
      engine: clickhouse
      database: biz_analytics
      consistency: eventual
      sync_mode: realtime     # 实时同步,延迟 < 5s
      retention: 365d
    archival:
      engine: s3_parquet      # 对象存储 + Parquet 格式
      bucket: data-archive
      partition: date         # 按日期分区
      sync_mode: daily_batch  # 每日批量归档
      retention: 7y

  user_profile:
    operational:
      engine: postgresql
      database: biz_ops
      consistency: strong
      retention: permanent    # 用户资料长期保留在操作层
    analytical:
      engine: clickhouse
      database: biz_analytics
      consistency: eventual
      sync_mode: hourly_batch
      retention: 365d
    archival:
      skip: true              # 用户资料不需要额外归档

这个配置的关键设计意图:

  • 操作层保留期短:热数据只留 90 天,避免业务库膨胀。超过 90 天的订单详情从影像层查询。
  • 同步模式按需求选:订单数据需要近实时分析,用 realtime;用户画像更新频率低,用 hourly_batch。
  • 不是所有数据都要三层:用户资料已经在操作层长期保留,归档层可以跳过。

同步管道代码

# pipelines/op_to_analytical.py
"""操作层 → 分析层:近实时同步管道"""

import time
from datetime import datetime, timedelta
from sqlalchemy import create_engine, text
from clickhouse_driver import Client

# 从配置文件加载,这里用硬编码演示
PG_URL = "postgresql://user:pass@localhost:5432/biz_ops"
CH_URL = "http://localhost:8123"

pg_engine = create_engine(PG_URL)
ch_client = Client(host="localhost", port=9000, database="biz_analytics")

SYNC_WATERMARK = {}  # 记录每张表上次同步的位置


def sync_orders(batch_size=5000, lag_seconds=10):
    """将操作层订单增量同步到分析层"""

    # 用更新时间作为水位线,避免漏同步
    cutoff = datetime.utcnow() - timedelta(seconds=lag_seconds)
    last_sync = SYNC_WATERMARK.get("orders", datetime.utcnow() - timedelta(hours=1))

    if last_sync >= cutoff:
        print("[sync] 无新数据需要同步")
        return 0

    query = text("""
        SELECT order_id, user_id, amount, status, created_at, updated_at
        FROM orders
        WHERE updated_at > :last_sync AND updated_at <= :cutoff
        ORDER BY updated_at
        LIMIT :batch_size
    """)

    with pg_engine.connect() as conn:
        rows = conn.execute(query, {
            "last_sync": last_sync,
            "cutoff": cutoff,
            "batch_size": batch_size,
        }).fetchall()

    if not rows:
        SYNC_WATERMARK["orders"] = cutoff
        return 0

    # 写入 ClickHouse
    ch_client.execute(
        "INSERT INTO orders_analytical VALUES",
        rows  # 直接传入 tuple 列表
    )

    # 推进水位线
    SYNC_WATERMARK["orders"] = cutoff
    print(f"[sync] 已同步 {len(rows)} 条订单,水位线推进到 {cutoff}")
    return len(rows)


if __name__ == "__main__":
    # 模拟持续同步循环
    while True:
        sync_orders()
        time.sleep(5)  # 每 5 秒轮询一次

运行前需要准备:

  1. PostgreSQL 中有 orders 表,包含 order_id, user_id, amount, status, created_at, updated_at 字段。
  2. ClickHouse 中有对应的 orders_analytical 表。建表语句:
-- ClickHouse 建表
CREATE TABLE IF NOT EXISTS orders_analytical (
    order_id    String,
    user_id     String,
    amount      Decimal64(2),
    status      String,
    created_at  DateTime,
    updated_at  DateTime
) ENGINE = MergeTree()
ORDER BY (created_at, order_id)
PARTITION BY toYYYYMM(created_at);
  1. 安装依赖:pip install sqlalchemy clickhouse-driver psycopg2-binary

这段代码的核心思路是水位线增量同步——用 updated_at 字段标记上次同步位置,每次只拉取新增和变更的数据,避免全量扫描拖垮业务库。

数据流转的故障边界

分层架构引入了同步管道,也就引入了新的故障点。需要明确几个边界:

同步延迟的容忍度。 操作层写入成功后,分析层多久能看到?5 秒?1 小时?这个数字必须和业务方对齐,否则会出现"报表数据不对"的投诉。

同步失败的处理策略。 网络抖动导致一批数据没同步过去,是重试还是告警后人工介入?建议:自动重试 3 次,失败后写入死信队列,同时触发告警。

数据一致性的校验机制。 定期(比如每天凌晨)对操作层和分析层的关键表做行数 + 聚合值校验,发现偏差及时修复。下面是一个最小化的校验脚本:

# pipelines/quality_check.py
"""每日数据一致性校验"""

from sqlalchemy import create_engine, text
from clickhouse_driver import Client

pg_engine = create_engine("postgresql://user:pass@localhost:5432/biz_ops")
ch_client = Client(host="localhost", port=9000, database="biz_analytics")


def check_orders_count(date_str: str):
    """校验某日订单行数是否一致"""

    # 操作层
    with pg_engine.connect() as conn:
        pg_count = conn.execute(text("""
            SELECT COUNT(*) FROM orders
            WHERE created_at::date = :date
        """), {"date": date_str}).scalar()

    # 分析层
    ch_count = ch_client.execute(
        "SELECT COUNT(*) FROM orders_analytical WHERE toDate(created_at) = :date",
        {"date": date_str}
    )[0][0]

    diff = pg_count - ch_count
    status = "✅ PASS" if diff == 0 else f"❌ FAIL (差 {diff} 条)"
    print(f"[校验] {date_str} 订单: PG={pg_count}, CH={ch_count}{status}")
    return diff == 0


if __name__ == "__main__":
    from datetime import datetime, timedelta
    # 校验昨天的数据
    yesterday = (datetime.utcnow() - timedelta(days=1)).strftime("%Y-%m-%d")
    check_orders_count(yesterday)

架构决策清单

在项目初期做数据架构规划时,可以用下面这张清单逐项确认:

决策项 要回答的问题 常见错误
数据域划分 有哪些独立的数据主题? 所有数据混在一个库
存储选型 每个域的读写特征是什么? 用 MySQL 扛所有场景
同步策略 哪些数据需要跨层同步?延迟容忍度? 全量定时同步,无水位线
保留策略 每层数据保留多久?过期怎么清理? 业务库无限增长
校验机制 怎么发现同步偏差? 靠用户投诉发现问题
影像归档 历史数据怎么低成本长期留存? 在业务库做历史查询
故障预案 同步管道挂了怎么办? 无告警、无重试、无死信队列

每一项不需要在第一天就完美回答,但必须有意识地记录当前决策和预期演进方向。架构不是一次性的设计图,而是持续演进的决策日志。

最后一点

数据架构不是数据平台的专属话题。业务系统研发团队如果不做数据架构规划,最终会以更痛苦的方式补上这一课——系统越来越慢、数据越来越乱、报表越来越不准,而修复成本远高于初期规划成本。

从最简单的分层存储和水位线同步开始,哪怕只有操作层和分析层两层,也比所有数据堆在一个库里要好得多。


相关推荐