别再纠结架构风格还是架构模式了——双驱动架构的务实选择

2026-05-28 28 预计阅读时间:1 分钟
来源:my.oschina.net AI 摘要 原文链接

免责声明:本文为 AI 摘要整理,建议结合原文阅读。摘要可能省略上下文、版本差异或边界条件,不作为官方说明。

预计阅读时间:12 分钟

每次架构评审会上,总有人抛出这个问题:"我们用的是架构风格还是架构模式?"争论一轮下来,设计图没画完,术语倒是厘清了。但系统还是那个系统,代码还是那些代码。

原文的核心观点很直接:过度区分架构风格和架构模式没有实际意义。风格也好,模式也罢,最终都是在回答同一个问题——如何从高层次指导系统构建,同时满足更多质量属性。与其在概念上划界,不如把注意力放回驱动架构的两股力量上。

两股驱动力,而不是两个标签

所谓"双驱动",指的是架构同时受两类力量塑造:

  • 业务驱动:功能需求、用户场景、领域边界。决定了系统要做什么、切成几块。
  • 质量属性驱动:可用性、可扩展性、安全性、性能、可维护性。决定了系统怎么做、做到什么程度。

架构风格(比如微服务、事件驱动)和架构模式(比如 CQRS、侧车模式)的区别,文献里常说一个是"高层次"、一个是"中层次"。但落到实际设计,你选微服务风格,必然要引入 API Gateway 模式、服务发现模式;你选事件驱动风格,自然就绑定了发布-订阅模式、最终一致性模式。它们是同一条决策链上的不同环节,硬拆成两个术语体系只会增加沟通成本。

真正值得区分的不是风格和模式,而是:当前决策主要受哪股驱动力主导?

一个决策链的完整走法

拿一个常见场景举例:订单系统需要支持高峰期 10 倍流量,同时保证订单不丢失。

业务驱动告诉你——订单是核心领域,要独立演进,不能和用户模块耦合。质量属性驱动告诉你——需要高可用、异步削峰、最终一致性。这两股力量合在一起,推你走向事件驱动 + 微服务的组合。接下来你做的每一件事,既是风格选择,也是模式选择,没必要分开命名:

决策 驱动力 传统归类
订单服务独立部署 业务边界 + 独立扩展 风格(微服务)
下单事件写入 Kafka 异步削峰 + 解耦 风格(事件驱动)
读写分离,写走主库、读走 CQRS 视图 高并发读 + 一致性容忍 模式(CQRS)
Sidecar 做限流与熔断 可用性 + 容错 模式(Sidecar)

你看,同一条链上,风格和模式交替出现,硬要分清哪一步是风格、哪一步是模式,对系统构建没有帮助。

把架构决策写进代码——一个可运行的项目骨架

与其画 UML 争论术语,不如把架构决策直接编码到项目结构和配置里。下面是一个 Python 项目的骨架,用 FastAPI 实现订单服务,同时把"事件驱动 + 微服务 + CQRS"的决策具象化。

# project layout:
# order-service/
#   app/
#     main.py            # 入口,挂载路由与事件总线
#     domain/
#       order.py          # 领域模型(纯业务逻辑,无基础设施依赖)
#     commands/
#       create_order.py   # 写端:命令处理
#     queries/
#       get_order.py      # 读端:查询处理(CQRS 分离)
#     events/
#       bus.py            # 事件总线抽象
#       kafka_bus.py      # Kafka 实现(可替换为 Redis Streams 等)
#     sidecar/
#       rate_limiter.py   # 限流中间件(Sidecar 模式的进程内实现)
#   config/
#     docker-compose.yml  # 本地开发编排
#   tests/
#     test_create_order.py

核心领域模型,只关心业务规则:

# app/domain/order.py
from dataclasses import dataclass, field
from datetime import datetime, timezone
from uuid import uuid4

@dataclass
class Order:
    order_id: str = field(default_factory=lambda: str(uuid4()))
    user_id: str = ""
    items: list = field(default_factory=list)
    status: str = "created"
    created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))

    def total_price(self) -> float:
        return sum(item["price"] * item["quantity"] for item in self.items)

    def cancel(self) -> None:
        if self.status not in ("created", "pending"):
            raise ValueError(f"Cannot cancel order in status: {self.status}")
        self.status = "cancelled"

写端——命令处理,发布事件:

# app/commands/create_order.py
from app.domain.order import Order
from app.events.bus import EventBus

async def handle_create_order(user_id: str, items: list, bus: EventBus) -> Order:
    order = Order(user_id=user_id, items=items)
    # 持久化到写库(主库)
    # await write_repo.save(order)
    # 发布事件——业务驱动(通知下游)+ 质量驱动(异步解耦)
    await bus.publish("order.created", {
        "order_id": order.order_id,
        "user_id": order.user_id,
        "total_price": order.total_price(),
    })
    return order

读端——CQRS 查询,从视图读:

# app/queries/get_order.py
from app.domain.order import Order

async def get_order_by_id(order_id: str) -> dict | None:
    # 从读库(CQRS 视图)查询,不经过写库
    # 视图由事件消费者异步更新
    # await read_repo.find_by_id(order_id)
    # 以下为模拟返回
    return {
        "order_id": order_id,
        "status": "created",
        "total_price": 99.0,
    }

事件总线抽象——架构决策的锚点:

# app/events/bus.py
from abc import ABC, abstractmethod

class EventBus(ABC):
    """事件总线抽象——选择 Kafka 还是 Redis 是实现细节,不影响领域逻辑。"""
    @abstractmethod
    async def publish(self, topic: str, payload: dict) -> None: ...
    @abstractmethod
    async def subscribe(self, topic: str, handler) -> None: ...

# app/events/kafka_bus.py
import json
from app.events.bus import EventBus

class KafkaEventBus(EventBus):
    """Kafka 实现——对应'事件驱动风格'的落地,也是'发布-订阅模式'的编码。"""
    def __init__(self, bootstrap_servers: str = "localhost:9092"):
        self.bootstrap_servers = bootstrap_servers
        # 实际项目中用 aiokafka 的 AIOKafkaProducer / AIOKafkaConsumer
        # 此处省略连接细节,聚焦架构意图

    async def publish(self, topic: str, payload: dict) -> None:
        # producer = AIOKafkaProducer(bootstrap_servers=self.bootstrap_servers)
        # await producer.start()
        # await producer.send_and_wait(topic, json.dumps(payload).encode())
        print(f"[Kafka] published to {topic}: {payload}")

    async def subscribe(self, topic: str, handler) -> None:
        # consumer = AIOKafkaConsumer(topic, bootstrap_servers=self.bootstrap_servers)
        # await consumer.start()
        # async for msg in consumer: await handler(json.loads(msg.value))
        print(f"[Kafka] subscribed to {topic}")

限流中间件——Sidecar 模式的进程内版本:

# app/sidecar/rate_limiter.py
from fastapi import Request, HTTPException
from time import time

class RateLimiter:
    """简单滑动窗口限流——Sidecar 模式在进程内的轻量实现。
    生产环境建议用外部 Sidecar(如 Envoy)或 Redis 限流。"""
    def __init__(self, max_requests: int = 100, window_seconds: int = 60):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self._timestamps: dict[str, list[float]] = {}

    async def check(self, request: Request) -> None:
        client_id = request.client.host if request.client else "unknown"
        now = time()
        window = self._timestamps.setdefault(client_id, [])
        window[:] = [t for t in window if now - t < self.window_seconds]
        if len(window) >= self.max_requests:
            raise HTTPException(status_code=429, detail="Too many requests")
        window.append(now)

入口——把所有决策组装起来:

# app/main.py
from fastapi import FastAPI, Request
from app.commands.create_order import handle_create_order
from app.queries.get_order import get_order_by_id
from app.events.kafka_bus import KafkaEventBus
from app.sidecar.rate_limiter import RateLimiter

app = FastAPI(title="Order Service")
bus = KafkaEventBus()
limiter = RateLimiter(max_requests=50, window_seconds=60)

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    await limiter.check(request)
    response = await call_next(request)
    return response

@app.post("/orders")
async def create_order(user_id: str, items: list[dict]):
    order = await handle_create_order(user_id=user_id, items=items, bus=bus)
    return {"order_id": order.order_id, "status": order.status}

@app.get("/orders/{order_id}")
async def read_order(order_id: str):
    result = await get_order_by_id(order_id)
    if not result:
        from fastapi import HTTPException
        raise HTTPException(status_code=404, detail="Order not found")
    return result

本地开发编排:

# config/docker-compose.yml
version: "3.9"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  order-service:
    build: .
    ports: ["8000:8000"]
    depends_on: [kafka]
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092

运行方式:

# 启动基础设施
docker compose -f config/docker-compose.yml up -d

# 启动服务(开发模式)
cd order-service
pip install fastapi uvicorn
uvicorn app.main:app --reload --port 8000

# 测试下单
curl -X POST "http://localhost:8000/orders?user_id=u1" \
  -H "Content-Type: application/json" \
  -d '[{"name":"book","price":30,"quantity":2},{"name":"pen","price":5,"quantity":3}]'

# 测试查询
curl http://localhost:8000/orders/<返回的order_id>

这个骨架里,你能看到业务驱动(订单领域独立)和质量属性驱动(异步事件、限流、读写分离)同时作用。至于每一步叫"风格"还是"模式",代码不在乎,团队也不该在乎。

务实选择:三个检查项

下次架构评审,把术语争论替换成这三个问题:

  1. 当前决策受哪股驱动力主导? 如果是业务边界问题,先画领域上下文图;如果是可用性/性能问题,先列质量属性优先级。搞清驱动源,决策方向就不会偏。

  2. 这条决策链是否连贯? 从最高层的选择(比如事件驱动)到最具体的实现(比如 Kafka topic 设计),中间不能断裂。风格和模式是链上的环节,不是独立的分类。

  3. 决策是否可逆、可替换? 上面的 EventBus 抽象就是例子——今天用 Kafka,明天换 Redis Streams,领域逻辑不用改。把架构决策编码为接口和配置,而不是硬编码到业务代码里,这样无论你叫它风格还是模式,替换成本都很低。

过度区分术语的最大代价不是浪费时间,而是让团队误以为选了某个"风格"就自动获得了对应的质量属性。微服务风格不保证独立部署,事件驱动风格不保证高可用——质量属性靠具体的模式落地、靠代码实现、靠运维验证。风格和模式是一条路上的路标和铺路石,分开看没意义,连起来走才到终点。


相关推荐