在 Azure 上部署由多个实例组成的 FastAPI 应用集群时,一个常见的挑战是下游服务的稳定性。如果一个关键的下游依赖项出现故障,传统的单实例熔断器无法阻止集群中的其他节点继续发起无效请求,最终导致“惊群效应”,加剧故障。一个节点熔断了,但其他九个节点还在疯狂重试,这在生产环境中是不可接受的。我们需要的是一个集群感知的、状态同步的分布式熔断器。
定义问题:分布式状态一致性的挑战
假设我们有一个 FastAPI 服务集群,部署在 Azure App Service 或一组虚拟机上,它们都依赖同一个外部 API。当该 API 性能下降或彻底宕机时,我们的目标是:
- 快速熔断:一旦有节点检测到连续的失败,并决定熔断,这个“熔断”状态需要被集群中的所有其他节点几乎瞬时获知。
- 避免脑裂:不能出现部分节点认为服务已熔断,而另一部分节点仍在尝试调用的情况。状态必须是全局一致的。
- 自动恢复:当熔断器进入“半开”状态时,只允许一个节点进行探测性调用。如果成功,则该节点负责将状态恢复为“关闭”,并再次通知整个集群。
方案 A:基于 Redis Pub/Sub 的广播机制
一个直接的想法是使用 Redis。当一个实例决定熔断时,它向一个特定的 Redis channel PUBLISH
一条消息。所有其他实例都 SUBSCRIBE
这个 channel,接收到消息后在本地更新其熔断器状态。
优点:
- 实现相对简单,Redis Pub/Sub 是一个成熟且广泛使用的技术。
- 性能极高,消息广播延迟非常低。
缺点:
- 可靠性不足: Redis Pub/Sub 是“即发即弃”(fire-and-forget)模型。如果一个订阅者实例恰好在消息发布时网络抖动或正在重启,它将永久错过这次状态变更。这直接违背了我们对状态一致性的核心要求。
- 缺乏状态持久化: Pub/Sub 本身不存储状态。如果一个新节点加入集群,它无法得知当前的全局熔断状态,直到下一次状态变更发生。我们需要一个额外的 Redis key 来存储当前状态,但这又引入了读写同步的复杂性。
- 原子性问题: 状态的变更(例如,从
CLOSED
到OPEN
)和消息的发布不是原子操作。在真实项目中,这种微小的时间窗口总会在高并发下引发问题。
对于熔断器这种要求强一致性的控制平面组件,使用 Redis Pub/Sub 方案引入的风险过高。它的优势在于速度,但牺牲了分布式系统设计中至关重要的正确性。
方案 B:基于 etcd 的 Watch 与事务机制
etcd 是一个为分布式系统设计的、强一致性的键值存储。它基于 Raft 协议,专为共享配置和服务发现等场景打造。这正是我们所需要的。
优点:
- 强一致性保证: Raft 协议确保了所有 etcd 节点上的数据副本是一致的。写入操作在被集群多数节点确认后才会返回成功。
- 可靠的 Watch 机制: 客户端可以
Watch
一个 key 或一个 key 前缀。任何对这些 key 的变更,etcd 都会保证将通知可靠地推送给所有Watcher
。即使客户端短暂断线重连,它也能获取到断线期间的所有变更事件。这完美解决了 Redis Pub/Sub 的消息丢失问题。 - 原子操作: etcd 提供事务(Transaction)和比较并交换(Compare-And-Swap)操作。我们可以原子地检查一个 key 的当前值,并在此基础上进行修改。这对于解决多个节点同时尝试改变熔断器状态的竞态条件至关重要。
缺点:
- 部署与维护成本: 相比于一个单点的 Redis,在 Azure 上部署和维护一个高可用的 etcd 集群(通常是3或5个节点)更为复杂。
- 写性能: 由于 Raft 协议的一致性开销,etcd 的写性能低于 Redis。但在我们的场景中,熔断状态的变更频率极低,完全可以接受。
决策: 我们选择方案 B。对于分布式系统的控制平面,正确性和一致性永远是第一位的。etcd 提供的能力与我们的问题域完美匹配。在 Azure 上,我们可以使用专用的虚拟机 SKU (如 Standard_D2s_v3
) 来部署一个健壮的 etcd 集群,并配置好网络安全组规则。
核心实现概览
我们的实现将围绕三个核心组件展开:
- 一个本地熔断器状态机 (
CircuitState
)。 - 一个与 etcd 交互的同步器 (
EtcdSynchronizer
),它在后台运行,负责 Watch 状态变更并更新本地状态。 - 一个 FastAPI 依赖或装饰器,将熔断逻辑注入到实际的 API 调用中。
下面是整个架构的交互流程图:
sequenceDiagram participant Client participant FastAPI_Node1 participant FastAPI_Node2 participant DownstreamAPI participant EtcdCluster Client->>+FastAPI_Node1: 发起请求 /call-downstream FastAPI_Node1->>+DownstreamAPI: 尝试调用 (失败) DownstreamAPI-->>-FastAPI_Node1: 返回 503 Service Unavailable Note over FastAPI_Node1: 本地失败计数器+1, 达到阈值 FastAPI_Node1->>+EtcdCluster: 事务写入: 将 /breakers/downstream-api 状态设为 OPEN EtcdCluster-->>-FastAPI_Node1: 写入成功 EtcdCluster->>FastAPI_Node1: Watch 事件通知 (自己的写入) EtcdCluster->>FastAPI_Node2: Watch 事件通知 (来自Node1的写入) Note over FastAPI_Node1, FastAPI_Node2: 收到 etcd 通知, 更新本地熔断器为 OPEN loop 稍后 Client->>+FastAPI_Node2: 发起请求 /call-downstream Note over FastAPI_Node2: 检查本地熔断器状态 (OPEN), 立即拒绝请求 FastAPI_Node2-->>-Client: 返回 503 熔断保护 end
关键代码与原理解析
首先,我们需要安装必要的库。
pip install fastapi uvicorn httpx python-etcd3 asyncio-glib
1. 熔断器核心逻辑与状态机
我们先定义一个纯粹的、不依赖任何外部存储的状态机。
# src/breaker/state.py
import time
from enum import Enum
from dataclasses import dataclass, field
class BreakerState(Enum):
CLOSED = "CLOSED"
OPEN = "OPEN"
HALF_OPEN = "HALF_OPEN"
@dataclass
class CircuitBreakerState:
"""
管理单个熔断器的本地状态。
这个对象应该是线程安全的,但在 FastAPI 的 asyncio 环境中,
只要它不在多个线程间共享,我们就不需要显式加锁。
"""
state: BreakerState = BreakerState.CLOSED
failure_count: int = 0
success_count: int = 0
last_state_change_time: float = field(default_factory=time.monotonic)
# --- 配置项 ---
FAILURE_THRESHOLD: int = 5
RECOVERY_TIMEOUT_SECONDS: int = 30
HALF_OPEN_SUCCESS_THRESHOLD: int = 2
def is_open(self) -> bool:
"""检查熔断器是否开启,并处理从 OPEN 到 HALF_OPEN 的自动转换。"""
if self.state == BreakerState.OPEN:
elapsed = time.monotonic() - self.last_state_change_time
if elapsed > self.RECOVERY_TIMEOUT_SECONDS:
# 这里的状态转换只是本地的初步判断,最终决策由 etcd 决定
# print(f"Breaker recovering to HALF_OPEN state locally.")
return False # 允许探测性调用
return True
return False
def record_failure(self):
"""记录一次失败。"""
self.failure_count += 1
self.success_count = 0
if self.state == BreakerState.CLOSED and self.failure_count >= self.FAILURE_THRESHOLD:
# 达到了本地阈值,但这并不直接触发状态改变,而是触发向 etcd 的写入尝试
pass
def record_success(self):
"""记录一次成功。"""
self.success_count += 1
self.failure_count = 0
def set_state(self, new_state: BreakerState):
"""
强制设置状态,通常由 etcd 同步器调用。
"""
if self.state != new_state:
print(f"State transition from {self.state.value} to {new_state.value}")
self.state = new_state
self.last_state_change_time = time.monotonic()
self.failure_count = 0
self.success_count = 0
这里的关键点在于,本地状态机的 record_failure
方法并不会直接将状态变为 OPEN
。它仅仅是记录失败。状态转换的“真相之源” (Source of Truth) 将永远是 etcd。
2. etcd 同步器与 Watch 机制
这是整个系统的核心。我们使用 python-etcd3
库,并创建一个后台 asyncio
任务来持续监听 etcd 中的状态变更。
# src/breaker/synchronizer.py
import asyncio
import etcd3
import logging
from typing import Dict
from .state import CircuitBreakerState, BreakerState
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class EtcdSynchronizer:
def __init__(self, etcd_host: str, etcd_port: int, service_name: str, local_state: CircuitBreakerState):
self.etcd_client = etcd3.client(host=etcd_host, port=etcd_port)
self.service_name = service_name
self.key = f"/circuit-breakers/{self.service_name}"
self.local_state = local_state
self.node_id = f"fastapi-node-{id(self)}" # 简单的唯一标识符
async def initialize_state(self):
"""应用启动时,从 etcd 拉取初始状态。"""
try:
value, _ = self.etcd_client.get(self.key)
if value:
state_str = value.decode('utf-8')
self.local_state.set_state(BreakerState(state_str))
logging.info(f"Initialized state from etcd for '{self.service_name}': {state_str}")
else:
# 如果 key 不存在,说明是第一个节点,尝试创建并设置为 CLOSED
logging.info(f"No initial state in etcd for '{self.service_name}'. Setting to CLOSED.")
await self.propose_state_change(BreakerState.CLOSED, force=True)
except Exception as e:
logging.error(f"Failed to initialize state from etcd: {e}")
# 采用默认的 CLOSED 状态,并继续运行 Watch
self.local_state.set_state(BreakerState.CLOSED)
async def propose_state_change(self, new_state: BreakerState, force: bool = False) -> bool:
"""
尝试向 etcd 提交一个状态变更。使用事务来确保原子性。
这是防止竞态条件的关键。
"""
try:
if force:
# 强制写入,用于初始化或管理员干预
status, _ = self.etcd_client.transaction(
compare=[],
success=[self.etcd_client.transactions.put(self.key, new_state.value)],
failure=[]
)
elif new_state == BreakerState.OPEN:
# 只有当当前状态是 CLOSED 时,才允许变更为 OPEN
status, _ = self.etcd_client.transaction(
compare=[self.etcd_client.transactions.value(self.key) == BreakerState.CLOSED.value],
success=[self.etcd_client.transactions.put(self.key, new_state.value)],
failure=[]
)
elif new_state == BreakerState.HALF_OPEN:
# 只有当当前状态是 OPEN 时,才允许变更为 HALF_OPEN
# 这个转换由探测成功的节点触发
status, _ = self.etcd_client.transaction(
compare=[self.etcd_client.transactions.value(self.key) == BreakerState.OPEN.value],
success=[self.etcd_client.transactions.put(self.key, new_state.value)],
failure=[]
)
elif new_state == BreakerState.CLOSED:
# 只有当当前状态是 HALF_OPEN 时,才允许变更为 CLOSED
status, _ = self.etcd_client.transaction(
compare=[self.etcd_client.transactions.value(self.key) == BreakerState.HALF_OPEN.value],
success=[self.etcd_client.transactions.put(self.key, new_state.value)],
failure=[]
)
else:
status = False
if status:
logging.info(f"Successfully proposed state change to '{new_state.value}' for service '{self.service_name}'")
else:
logging.warning(f"Failed to propose state change to '{new_state.value}' for '{self.service_name}'. Another node was faster.")
return status
except Exception as e:
logging.error(f"Error proposing state change to etcd: {e}")
return False
async def watch_for_changes(self):
"""
核心的后台任务:持续 watch etcd key 的变化。
"""
while True:
try:
events_iterator, cancel = self.etcd_client.watch(self.key)
logging.info(f"Started watching key '{self.key}' on etcd.")
for event in events_iterator:
if isinstance(event, etcd3.events.PutEvent):
new_state_str = event.value.decode('utf-8')
try:
new_state = BreakerState(new_state_str)
self.local_state.set_state(new_state)
logging.info(f"State updated from etcd event: '{self.service_name}' is now {new_state.value}")
except ValueError:
logging.error(f"Received invalid state from etcd: {new_state_str}")
except Exception as e:
logging.error(f"Watch connection to etcd failed: {e}. Reconnecting in 5 seconds...")
await asyncio.sleep(5)
这里的 propose_state_change
是精髓。它使用了 etcd 的事务功能。例如,当一个节点想将状态从 CLOSED
变为 OPEN
时,它会发起一个事务,该事务的 compare
条件是“当且仅当 etcd 中 key 的当前值是 CLOSED
时”,success
操作才是“将 key 的值设为 OPEN
”。如果此时另一个节点已经抢先一步将值改成了 OPEN
,那么这个事务就会失败,从而保证了状态变更的原子性和幂等性。
3. FastAPI 集成
最后,我们将所有部分整合到 FastAPI 应用中。我们创建一个单例的熔断器管理器,并在应用启动时启动后台的 Watcher 任务。
# src/main.py
import os
import asyncio
import httpx
from fastapi import FastAPI, HTTPException, Request
from contextlib import asynccontextmanager
from .breaker.state import CircuitBreakerState, BreakerState
from .breaker.synchronizer import EtcdSynchronizer
# --- 全局熔断器实例管理 ---
# 在真实项目中,这应该是一个更复杂的工厂或管理器
# 这里为了演示,我们只管理一个
circuit_breakers: Dict[str, CircuitBreakerState] = {}
synchronizers: Dict[str, EtcdSynchronizer] = {}
# 从环境变量读取配置,这对于在 Azure App Service 或容器中运行至关重要
ETCD_HOST = os.environ.get("ETCD_HOST", "localhost")
ETCD_PORT = int(os.environ.get("ETCD_PORT", 2379))
DOWNSTREAM_SERVICE_NAME = "my_downstream_api"
DOWNSTREAM_SERVICE_URL = "http://localhost:9000/faulty" # 模拟一个会失败的下游服务
@asynccontextmanager
async def lifespan(app: FastAPI):
# 应用启动时执行
# 1. 创建本地状态对象
local_state = CircuitBreakerState()
circuit_breakers[DOWNSTREAM_SERVICE_NAME] = local_state
# 2. 创建并初始化 etcd 同步器
synchronizer = EtcdSynchronizer(
etcd_host=ETCD_HOST,
etcd_port=ETCD_PORT,
service_name=DOWNSTREAM_SERVICE_NAME,
local_state=local_state
)
synchronizers[DOWNSTREAM_SERVICE_NAME] = synchronizer
await synchronizer.initialize_state()
# 3. 启动后台 watcher 任务
watcher_task = asyncio.create_task(synchronizer.watch_for_changes())
print("FastAPI application startup complete. Breaker synchronizer running.")
yield
# 应用关闭时执行
watcher_task.cancel()
print("FastAPI application shutdown. Breaker synchronizer stopped.")
app = FastAPI(lifespan=lifespan)
@app.get("/call-downstream")
async def call_downstream_service():
breaker_state = circuit_breakers[DOWNSTREAM_SERVICE_NAME]
synchronizer = synchronizers[DOWNSTREAM_SERVICE_NAME]
# --- 熔断器检查 ---
if breaker_state.is_open():
raise HTTPException(status_code=503, detail="Service unavailable: Circuit Breaker is OPEN")
# --- 正常或半开状态下的调用逻辑 ---
try:
async with httpx.AsyncClient(timeout=3.0) as client:
response = await client.get(DOWNSTREAM_SERVICE_URL)
response.raise_for_status()
# 调用成功
breaker_state.record_success()
if breaker_state.state == BreakerState.HALF_OPEN:
# 在半开状态下,连续成功达到阈值后,尝试关闭熔断器
if breaker_state.success_count >= breaker_state.HALF_OPEN_SUCCESS_THRESHOLD:
await synchronizer.propose_state_change(BreakerState.CLOSED)
return {"status": "success", "data": response.json()}
except (httpx.RequestError, httpx.HTTPStatusError) as e:
# 调用失败
breaker_state.record_failure()
if breaker_state.state == BreakerState.CLOSED and breaker_state.failure_count >= breaker_state.FAILURE_THRESHOLD:
# 失败次数达到阈值,尝试开启熔断器
await synchronizer.propose_state_change(BreakerState.OPEN)
raise HTTPException(status_code=502, detail=f"Bad Gateway: Downstream service failed. Reason: {str(e)}")
# 模拟一个不稳定的下游服务,用于测试
@app.get("/faulty")
async def faulty_service(request: Request):
# 简单的模拟:50% 概率失败
import random
if random.random() > 0.5:
return {"message": "I am working correctly!"}
else:
raise HTTPException(status_code=503, detail="I am intentionally failing.")
在 lifespan
管理器中,我们确保了 EtcdSynchronizer
的 watch_for_changes
方法作为一个后台任务在应用的整个生命周期中持续运行。在API端点 call_downstream_service
中,业务逻辑被熔断器包裹。失败时,它会记录失败并可能触发向 etcd 的状态变更提议;成功时,特别是在 HALF_OPEN
状态下,它会尝试恢复整个集群的状态。
架构的局限性与未来展望
这个方案虽然健壮,但也并非没有权衡。
首先,etcd 集群的运维 是一个不能忽视的成本。它自身的高可用性、备份与恢复、性能监控都需要投入资源。在选择此方案前,必须评估团队是否有能力维护一个生产级的 etcd 集群。在 Azure 上,这意味着需要配置虚拟机规模集、负载均衡器以及适当的磁盘和网络。
其次,状态同步延迟 虽然很低,但并非为零。从一个节点写入 etcd,到 Raft 协议达成共识,再到 Watch 事件推送到所有其他节点,这个过程存在毫秒级的延迟。对于绝大多数应用场景这完全可以接受,但在需要纳秒级响应的极端高频交易等领域,可能需要更特殊的硬件或软件方案。
一个可行的优化路径是引入更复杂的熔断策略。当前是基于连续失败次数,未来可以扩展为在 etcd 中存储更复杂的配置,例如基于请求错误率的百分比、动态调整的恢复超时时间等。FastAPI 节点可以 Watch 这些配置 key,从而实现整个集群熔断策略的动态热更新,而无需重新部署应用。
此外,该模式可以抽象为一个通用的高可用组件库。不仅仅是熔断器,像分布式限流、动态配置、主节点选举等功能,都可以基于 etcd 提供的强大原语来实现,为整个基于 FastAPI 的微服务体系提供统一的、可靠的分布式协调层。