在 Azure 环境下基于 etcd Watch 机制构建 FastAPI 集群的分布式熔断器


在 Azure 上部署由多个实例组成的 FastAPI 应用集群时,一个常见的挑战是下游服务的稳定性。如果一个关键的下游依赖项出现故障,传统的单实例熔断器无法阻止集群中的其他节点继续发起无效请求,最终导致“惊群效应”,加剧故障。一个节点熔断了,但其他九个节点还在疯狂重试,这在生产环境中是不可接受的。我们需要的是一个集群感知的、状态同步的分布式熔断器。

定义问题:分布式状态一致性的挑战

假设我们有一个 FastAPI 服务集群,部署在 Azure App Service 或一组虚拟机上,它们都依赖同一个外部 API。当该 API 性能下降或彻底宕机时,我们的目标是:

  1. 快速熔断:一旦有节点检测到连续的失败,并决定熔断,这个“熔断”状态需要被集群中的所有其他节点几乎瞬时获知。
  2. 避免脑裂:不能出现部分节点认为服务已熔断,而另一部分节点仍在尝试调用的情况。状态必须是全局一致的。
  3. 自动恢复:当熔断器进入“半开”状态时,只允许一个节点进行探测性调用。如果成功,则该节点负责将状态恢复为“关闭”,并再次通知整个集群。

方案 A:基于 Redis Pub/Sub 的广播机制

一个直接的想法是使用 Redis。当一个实例决定熔断时,它向一个特定的 Redis channel PUBLISH 一条消息。所有其他实例都 SUBSCRIBE 这个 channel,接收到消息后在本地更新其熔断器状态。

  • 优点:

    • 实现相对简单,Redis Pub/Sub 是一个成熟且广泛使用的技术。
    • 性能极高,消息广播延迟非常低。
  • 缺点:

    • 可靠性不足: Redis Pub/Sub 是“即发即弃”(fire-and-forget)模型。如果一个订阅者实例恰好在消息发布时网络抖动或正在重启,它将永久错过这次状态变更。这直接违背了我们对状态一致性的核心要求。
    • 缺乏状态持久化: Pub/Sub 本身不存储状态。如果一个新节点加入集群,它无法得知当前的全局熔断状态,直到下一次状态变更发生。我们需要一个额外的 Redis key 来存储当前状态,但这又引入了读写同步的复杂性。
    • 原子性问题: 状态的变更(例如,从 CLOSEDOPEN)和消息的发布不是原子操作。在真实项目中,这种微小的时间窗口总会在高并发下引发问题。

对于熔断器这种要求强一致性的控制平面组件,使用 Redis Pub/Sub 方案引入的风险过高。它的优势在于速度,但牺牲了分布式系统设计中至关重要的正确性。

方案 B:基于 etcd 的 Watch 与事务机制

etcd 是一个为分布式系统设计的、强一致性的键值存储。它基于 Raft 协议,专为共享配置和服务发现等场景打造。这正是我们所需要的。

  • 优点:

    • 强一致性保证: Raft 协议确保了所有 etcd 节点上的数据副本是一致的。写入操作在被集群多数节点确认后才会返回成功。
    • 可靠的 Watch 机制: 客户端可以 Watch 一个 key 或一个 key 前缀。任何对这些 key 的变更,etcd 都会保证将通知可靠地推送给所有 Watcher。即使客户端短暂断线重连,它也能获取到断线期间的所有变更事件。这完美解决了 Redis Pub/Sub 的消息丢失问题。
    • 原子操作: etcd 提供事务(Transaction)和比较并交换(Compare-And-Swap)操作。我们可以原子地检查一个 key 的当前值,并在此基础上进行修改。这对于解决多个节点同时尝试改变熔断器状态的竞态条件至关重要。
  • 缺点:

    • 部署与维护成本: 相比于一个单点的 Redis,在 Azure 上部署和维护一个高可用的 etcd 集群(通常是3或5个节点)更为复杂。
    • 写性能: 由于 Raft 协议的一致性开销,etcd 的写性能低于 Redis。但在我们的场景中,熔断状态的变更频率极低,完全可以接受。

决策: 我们选择方案 B。对于分布式系统的控制平面,正确性和一致性永远是第一位的。etcd 提供的能力与我们的问题域完美匹配。在 Azure 上,我们可以使用专用的虚拟机 SKU (如 Standard_D2s_v3) 来部署一个健壮的 etcd 集群,并配置好网络安全组规则。

核心实现概览

我们的实现将围绕三个核心组件展开:

  1. 一个本地熔断器状态机 (CircuitState)。
  2. 一个与 etcd 交互的同步器 (EtcdSynchronizer),它在后台运行,负责 Watch 状态变更并更新本地状态。
  3. 一个 FastAPI 依赖或装饰器,将熔断逻辑注入到实际的 API 调用中。

下面是整个架构的交互流程图:

sequenceDiagram
    participant Client
    participant FastAPI_Node1
    participant FastAPI_Node2
    participant DownstreamAPI
    participant EtcdCluster

    Client->>+FastAPI_Node1: 发起请求 /call-downstream
    FastAPI_Node1->>+DownstreamAPI: 尝试调用 (失败)
    DownstreamAPI-->>-FastAPI_Node1: 返回 503 Service Unavailable
    Note over FastAPI_Node1: 本地失败计数器+1, 达到阈值
    FastAPI_Node1->>+EtcdCluster: 事务写入: 将 /breakers/downstream-api 状态设为 OPEN
    EtcdCluster-->>-FastAPI_Node1: 写入成功
    EtcdCluster->>FastAPI_Node1: Watch 事件通知 (自己的写入)
    EtcdCluster->>FastAPI_Node2: Watch 事件通知 (来自Node1的写入)
    Note over FastAPI_Node1, FastAPI_Node2: 收到 etcd 通知, 更新本地熔断器为 OPEN
    
    loop 稍后
        Client->>+FastAPI_Node2: 发起请求 /call-downstream
        Note over FastAPI_Node2: 检查本地熔断器状态 (OPEN), 立即拒绝请求
        FastAPI_Node2-->>-Client: 返回 503 熔断保护
    end

关键代码与原理解析

首先,我们需要安装必要的库。

pip install fastapi uvicorn httpx python-etcd3 asyncio-glib

1. 熔断器核心逻辑与状态机

我们先定义一个纯粹的、不依赖任何外部存储的状态机。

# src/breaker/state.py
import time
from enum import Enum
from dataclasses import dataclass, field

class BreakerState(Enum):
    CLOSED = "CLOSED"
    OPEN = "OPEN"
    HALF_OPEN = "HALF_OPEN"

@dataclass
class CircuitBreakerState:
    """
    管理单个熔断器的本地状态。
    这个对象应该是线程安全的,但在 FastAPI 的 asyncio 环境中,
    只要它不在多个线程间共享,我们就不需要显式加锁。
    """
    state: BreakerState = BreakerState.CLOSED
    failure_count: int = 0
    success_count: int = 0
    last_state_change_time: float = field(default_factory=time.monotonic)

    # --- 配置项 ---
    FAILURE_THRESHOLD: int = 5
    RECOVERY_TIMEOUT_SECONDS: int = 30
    HALF_OPEN_SUCCESS_THRESHOLD: int = 2

    def is_open(self) -> bool:
        """检查熔断器是否开启,并处理从 OPEN 到 HALF_OPEN 的自动转换。"""
        if self.state == BreakerState.OPEN:
            elapsed = time.monotonic() - self.last_state_change_time
            if elapsed > self.RECOVERY_TIMEOUT_SECONDS:
                # 这里的状态转换只是本地的初步判断,最终决策由 etcd 决定
                # print(f"Breaker recovering to HALF_OPEN state locally.")
                return False # 允许探测性调用
            return True
        return False

    def record_failure(self):
        """记录一次失败。"""
        self.failure_count += 1
        self.success_count = 0
        if self.state == BreakerState.CLOSED and self.failure_count >= self.FAILURE_THRESHOLD:
            # 达到了本地阈值,但这并不直接触发状态改变,而是触发向 etcd 的写入尝试
            pass

    def record_success(self):
        """记录一次成功。"""
        self.success_count += 1
        self.failure_count = 0

    def set_state(self, new_state: BreakerState):
        """
        强制设置状态,通常由 etcd 同步器调用。
        """
        if self.state != new_state:
            print(f"State transition from {self.state.value} to {new_state.value}")
            self.state = new_state
            self.last_state_change_time = time.monotonic()
            self.failure_count = 0
            self.success_count = 0

这里的关键点在于,本地状态机的 record_failure 方法并不会直接将状态变为 OPEN。它仅仅是记录失败。状态转换的“真相之源” (Source of Truth) 将永远是 etcd。

2. etcd 同步器与 Watch 机制

这是整个系统的核心。我们使用 python-etcd3 库,并创建一个后台 asyncio 任务来持续监听 etcd 中的状态变更。

# src/breaker/synchronizer.py
import asyncio
import etcd3
import logging
from typing import Dict
from .state import CircuitBreakerState, BreakerState

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class EtcdSynchronizer:
    def __init__(self, etcd_host: str, etcd_port: int, service_name: str, local_state: CircuitBreakerState):
        self.etcd_client = etcd3.client(host=etcd_host, port=etcd_port)
        self.service_name = service_name
        self.key = f"/circuit-breakers/{self.service_name}"
        self.local_state = local_state
        self.node_id = f"fastapi-node-{id(self)}" # 简单的唯一标识符

    async def initialize_state(self):
        """应用启动时,从 etcd 拉取初始状态。"""
        try:
            value, _ = self.etcd_client.get(self.key)
            if value:
                state_str = value.decode('utf-8')
                self.local_state.set_state(BreakerState(state_str))
                logging.info(f"Initialized state from etcd for '{self.service_name}': {state_str}")
            else:
                # 如果 key 不存在,说明是第一个节点,尝试创建并设置为 CLOSED
                logging.info(f"No initial state in etcd for '{self.service_name}'. Setting to CLOSED.")
                await self.propose_state_change(BreakerState.CLOSED, force=True)
        except Exception as e:
            logging.error(f"Failed to initialize state from etcd: {e}")
            # 采用默认的 CLOSED 状态,并继续运行 Watch
            self.local_state.set_state(BreakerState.CLOSED)


    async def propose_state_change(self, new_state: BreakerState, force: bool = False) -> bool:
        """
        尝试向 etcd 提交一个状态变更。使用事务来确保原子性。
        这是防止竞态条件的关键。
        """
        try:
            if force:
                 # 强制写入,用于初始化或管理员干预
                status, _ = self.etcd_client.transaction(
                    compare=[],
                    success=[self.etcd_client.transactions.put(self.key, new_state.value)],
                    failure=[]
                )
            elif new_state == BreakerState.OPEN:
                # 只有当当前状态是 CLOSED 时,才允许变更为 OPEN
                status, _ = self.etcd_client.transaction(
                    compare=[self.etcd_client.transactions.value(self.key) == BreakerState.CLOSED.value],
                    success=[self.etcd_client.transactions.put(self.key, new_state.value)],
                    failure=[]
                )
            elif new_state == BreakerState.HALF_OPEN:
                # 只有当当前状态是 OPEN 时,才允许变更为 HALF_OPEN
                # 这个转换由探测成功的节点触发
                status, _ = self.etcd_client.transaction(
                    compare=[self.etcd_client.transactions.value(self.key) == BreakerState.OPEN.value],
                    success=[self.etcd_client.transactions.put(self.key, new_state.value)],
                    failure=[]
                )
            elif new_state == BreakerState.CLOSED:
                 # 只有当当前状态是 HALF_OPEN 时,才允许变更为 CLOSED
                status, _ = self.etcd_client.transaction(
                    compare=[self.etcd_client.transactions.value(self.key) == BreakerState.HALF_OPEN.value],
                    success=[self.etcd_client.transactions.put(self.key, new_state.value)],
                    failure=[]
                )
            else:
                status = False

            if status:
                logging.info(f"Successfully proposed state change to '{new_state.value}' for service '{self.service_name}'")
            else:
                logging.warning(f"Failed to propose state change to '{new_state.value}' for '{self.service_name}'. Another node was faster.")
            return status
        except Exception as e:
            logging.error(f"Error proposing state change to etcd: {e}")
            return False


    async def watch_for_changes(self):
        """
        核心的后台任务:持续 watch etcd key 的变化。
        """
        while True:
            try:
                events_iterator, cancel = self.etcd_client.watch(self.key)
                logging.info(f"Started watching key '{self.key}' on etcd.")
                for event in events_iterator:
                    if isinstance(event, etcd3.events.PutEvent):
                        new_state_str = event.value.decode('utf-8')
                        try:
                            new_state = BreakerState(new_state_str)
                            self.local_state.set_state(new_state)
                            logging.info(f"State updated from etcd event: '{self.service_name}' is now {new_state.value}")
                        except ValueError:
                            logging.error(f"Received invalid state from etcd: {new_state_str}")
            except Exception as e:
                logging.error(f"Watch connection to etcd failed: {e}. Reconnecting in 5 seconds...")
                await asyncio.sleep(5)

这里的 propose_state_change 是精髓。它使用了 etcd 的事务功能。例如,当一个节点想将状态从 CLOSED 变为 OPEN 时,它会发起一个事务,该事务的 compare 条件是“当且仅当 etcd 中 key 的当前值是 CLOSED 时”,success 操作才是“将 key 的值设为 OPEN”。如果此时另一个节点已经抢先一步将值改成了 OPEN,那么这个事务就会失败,从而保证了状态变更的原子性和幂等性。

3. FastAPI 集成

最后,我们将所有部分整合到 FastAPI 应用中。我们创建一个单例的熔断器管理器,并在应用启动时启动后台的 Watcher 任务。

# src/main.py
import os
import asyncio
import httpx
from fastapi import FastAPI, HTTPException, Request
from contextlib import asynccontextmanager

from .breaker.state import CircuitBreakerState, BreakerState
from .breaker.synchronizer import EtcdSynchronizer

# --- 全局熔断器实例管理 ---
# 在真实项目中,这应该是一个更复杂的工厂或管理器
# 这里为了演示,我们只管理一个
circuit_breakers: Dict[str, CircuitBreakerState] = {}
synchronizers: Dict[str, EtcdSynchronizer] = {}

# 从环境变量读取配置,这对于在 Azure App Service 或容器中运行至关重要
ETCD_HOST = os.environ.get("ETCD_HOST", "localhost")
ETCD_PORT = int(os.environ.get("ETCD_PORT", 2379))
DOWNSTREAM_SERVICE_NAME = "my_downstream_api"
DOWNSTREAM_SERVICE_URL = "http://localhost:9000/faulty" # 模拟一个会失败的下游服务

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 应用启动时执行
    # 1. 创建本地状态对象
    local_state = CircuitBreakerState()
    circuit_breakers[DOWNSTREAM_SERVICE_NAME] = local_state

    # 2. 创建并初始化 etcd 同步器
    synchronizer = EtcdSynchronizer(
        etcd_host=ETCD_HOST,
        etcd_port=ETCD_PORT,
        service_name=DOWNSTREAM_SERVICE_NAME,
        local_state=local_state
    )
    synchronizers[DOWNSTREAM_SERVICE_NAME] = synchronizer
    await synchronizer.initialize_state()

    # 3. 启动后台 watcher 任务
    watcher_task = asyncio.create_task(synchronizer.watch_for_changes())
    
    print("FastAPI application startup complete. Breaker synchronizer running.")
    yield
    # 应用关闭时执行
    watcher_task.cancel()
    print("FastAPI application shutdown. Breaker synchronizer stopped.")


app = FastAPI(lifespan=lifespan)

@app.get("/call-downstream")
async def call_downstream_service():
    breaker_state = circuit_breakers[DOWNSTREAM_SERVICE_NAME]
    synchronizer = synchronizers[DOWNSTREAM_SERVICE_NAME]

    # --- 熔断器检查 ---
    if breaker_state.is_open():
        raise HTTPException(status_code=503, detail="Service unavailable: Circuit Breaker is OPEN")

    # --- 正常或半开状态下的调用逻辑 ---
    try:
        async with httpx.AsyncClient(timeout=3.0) as client:
            response = await client.get(DOWNSTREAM_SERVICE_URL)
            response.raise_for_status()

        # 调用成功
        breaker_state.record_success()
        if breaker_state.state == BreakerState.HALF_OPEN:
            # 在半开状态下,连续成功达到阈值后,尝试关闭熔断器
            if breaker_state.success_count >= breaker_state.HALF_OPEN_SUCCESS_THRESHOLD:
                await synchronizer.propose_state_change(BreakerState.CLOSED)
        
        return {"status": "success", "data": response.json()}

    except (httpx.RequestError, httpx.HTTPStatusError) as e:
        # 调用失败
        breaker_state.record_failure()
        if breaker_state.state == BreakerState.CLOSED and breaker_state.failure_count >= breaker_state.FAILURE_THRESHOLD:
            # 失败次数达到阈值,尝试开启熔断器
            await synchronizer.propose_state_change(BreakerState.OPEN)
        
        raise HTTPException(status_code=502, detail=f"Bad Gateway: Downstream service failed. Reason: {str(e)}")


# 模拟一个不稳定的下游服务,用于测试
@app.get("/faulty")
async def faulty_service(request: Request):
    # 简单的模拟:50% 概率失败
    import random
    if random.random() > 0.5:
        return {"message": "I am working correctly!"}
    else:
        raise HTTPException(status_code=503, detail="I am intentionally failing.")

lifespan 管理器中,我们确保了 EtcdSynchronizerwatch_for_changes 方法作为一个后台任务在应用的整个生命周期中持续运行。在API端点 call_downstream_service 中,业务逻辑被熔断器包裹。失败时,它会记录失败并可能触发向 etcd 的状态变更提议;成功时,特别是在 HALF_OPEN 状态下,它会尝试恢复整个集群的状态。

架构的局限性与未来展望

这个方案虽然健壮,但也并非没有权衡。

首先,etcd 集群的运维 是一个不能忽视的成本。它自身的高可用性、备份与恢复、性能监控都需要投入资源。在选择此方案前,必须评估团队是否有能力维护一个生产级的 etcd 集群。在 Azure 上,这意味着需要配置虚拟机规模集、负载均衡器以及适当的磁盘和网络。

其次,状态同步延迟 虽然很低,但并非为零。从一个节点写入 etcd,到 Raft 协议达成共识,再到 Watch 事件推送到所有其他节点,这个过程存在毫秒级的延迟。对于绝大多数应用场景这完全可以接受,但在需要纳秒级响应的极端高频交易等领域,可能需要更特殊的硬件或软件方案。

一个可行的优化路径是引入更复杂的熔断策略。当前是基于连续失败次数,未来可以扩展为在 etcd 中存储更复杂的配置,例如基于请求错误率的百分比、动态调整的恢复超时时间等。FastAPI 节点可以 Watch 这些配置 key,从而实现整个集群熔断策略的动态热更新,而无需重新部署应用。

此外,该模式可以抽象为一个通用的高可用组件库。不仅仅是熔断器,像分布式限流、动态配置、主节点选举等功能,都可以基于 etcd 提供的强大原语来实现,为整个基于 FastAPI 的微服务体系提供统一的、可靠的分布式协调层。


  目录