一个看似简单的业务需求摆在了面前:用户完成某个关键操作后,系统需要同时在主业务库(PostgreSQL)中更新订单状态,并向数据仓库(Snowflake)中插入一条不可变的审计日志。这两个操作必须构成一个原子单元——要么都成功,要么都失败。
第一反应是使用分布式事务,但这个方案在技术选型阶段就被直接否决。我们的执行环境是 Google Cloud Functions,一个典型的 Serverless 平台,其短暂、无状态的特性对长事务协调极不友好。更关键的是,参与方之一 Snowflake 是一个云数据仓库,其设计哲学是为分析查询(OLAP)而非事务处理(OLTP)优化的,它根本不支持 2PC(两阶段提交)协议。这意味着传统的 XA 事务方案从一开始就不在考虑范围内。
直接在函数代码里依次执行两个写操作,然后寄希望于网络和数据库不出问题,这在生产环境中是不可接受的。这种“祈祷式编程”会留下数据不一致的烂摊子,当 PostgreSQL 写入成功而 Snowflake 写入失败时,我们就丢失了关键的审计记录,反之亦然。
排除了强一致性方案后,我们只能转向最终一致性。Saga 模式是解决这类跨服务、跨数据库数据一致性问题的标准实践。它将一个长事务拆分为一系列本地事务,每个本地事务都有一个对应的补偿操作。如果任何一个本地事务失败,Saga 会反向调用之前所有已成功事务的补偿操作,从而使系统状态回滚到初始状态。
在我们的场景下,这套逻辑非常清晰:
- Saga 开始
- 事务 T1: 使用 MyBatis 在 PostgreSQL 中更新订单状态。
- 事务 T2: 使用 Snowflake JDBC Driver 在 Snowflake 中插入审计日志。
- Saga 结束
如果 T2 失败,Saga 必须执行 T1 的补偿操作 C1。C1 的逻辑就是将订单状态恢复到更新前的状态。由于 T1 本身是一个本地数据库事务,它的失败会自动回滚,无需显式补偿。棘手的是 T2 的补偿操作 C2。由于 Snowflake 的 INSERT
操作一旦提交就无法“回滚”,C2 必须是一个业务逻辑上的逆操作,比如执行一条 DELETE
语句来删除刚刚插入的审计日志。
架构决策与Saga协调器设计
在 Serverless 环境中实现 Saga,协调器的状态管理是核心难点。Cloud Function 实例是短暂的,不能在内存中维护 Saga 的执行状态。我们需要一个外部存储来持久化 Saga 的执行日志,以便在函数实例崩溃、超时或重试时能够恢复状态。Firestore 或 Cloud SQL 都是可选项,考虑到读写延迟和成本,我们选择使用同一个 PostgreSQL 库中的一张专用表来记录 Saga 日志。
我们的 Saga 协调器设计如下:
sequenceDiagram participant GCF as Google Cloud Function (Orchestrator) participant SagaLogDB as PostgreSQL (Saga Log Table) participant BizDB as PostgreSQL (Business DB via MyBatis) participant DWH as Snowflake GCF->>SagaLogDB: 1. 创建Saga实例, 状态: PENDING GCF->>BizDB: 2. 执行T1: 更新订单状态 alt T1 成功 GCF->>SagaLogDB: 3a. 更新Saga步骤1状态: SUCCESS GCF->>DWH: 4. 执行T2: 插入审计日志 alt T2 成功 GCF->>SagaLogDB: 5a. 更新Saga步骤2状态: SUCCESS GCF->>SagaLogDB: 6a. 更新Saga实例状态: COMMITTED else T2 失败 GCF->>SagaLogDB: 5b. 更新Saga步骤2状态: FAILED GCF->>SagaLogDB: 6b. 更新Saga实例状态: COMPENSATING GCF->>BizDB: 7. 执行C1: 补偿T1(恢复订单状态) alt C1 成功 GCF->>SagaLogDB: 8a. 更新Saga实例状态: COMPENSATED else C1 失败 GCF->>SagaLogDB: 8b. 更新Saga实例状态: COMPENSATION_FAILED (需要人工介入) end end else T1 失败 GCF->>SagaLogDB: 3b. 更新Saga步骤1状态: FAILED GCF->>SagaLogDB: 4b. 更新Saga实例状态: ABORTED (无需补偿) end
这个流程图清晰地展示了成功路径和失败补偿路径。协调器的核心职责是驱动流程、记录状态,并在失败时触发补偿逻辑。
核心代码实现
我们使用 Java 17 作为 Cloud Functions 的运行时,依赖项包括 MyBatis、PostgreSQL JDBC Driver 和 Snowflake JDBC Driver。
1. Saga 定义与参与者接口
首先,定义 Saga 流程的参与者接口。每个参与者都需要定义一个执行 (execute
) 方法和一个补偿 (compensate
) 方法。
// src/main/java/com/example/saga/SagaParticipant.java
package com.example.saga;
import java.sql.Connection;
/**
* Saga 模式中的一个参与者(步骤)
* 定义了每个步骤需要执行的业务操作和补偿操作
* @param <T> 业务操作的输入参数类型
*/
public interface SagaParticipant<T> {
/**
* 获取参与者的名称,用于日志和Saga状态记录
* @return 参与者名称
*/
String getName();
/**
* 执行正向业务操作
*
* @param context 包含执行所需参数的上下文
* @param connection 数据库连接,由Saga协调器提供,用于确保本地事务
* @throws Exception 操作失败时抛出异常
*/
void execute(SagaContext<T> context, Connection connection) throws Exception;
/**
* 执行补偿操作
*
* @param context 包含补偿所需参数的上下文
* @param connection 数据库连接,由Saga协调器提供
* @throws Exception 补偿失败时抛出异常
*/
void compensate(SagaContext<T> context, Connection connection) throws Exception;
}
SagaContext
只是一个简单的容器,用于在 Saga 的不同步骤之间传递数据。
// src/main/java/com/example/saga/SagaContext.java
package com.example.saga;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class SagaContext<T> {
private final String sagaId;
private final T payload;
private final Map<String, Object> attributes;
public SagaContext(T payload) {
this.sagaId = UUID.randomUUID().toString();
this.payload = payload;
this.attributes = new HashMap<>();
}
public String getSagaId() {
return sagaId;
}
public T getPayload() {
return payload;
}
public void setAttribute(String key, Object value) {
this.attributes.put(key, value);
}
@SuppressWarnings("unchecked")
public <V> V getAttribute(String key) {
return (V) this.attributes.get(key);
}
}
2. MyBatis 参与者实现
这个参与者负责更新 PostgreSQL 数据库。我们使用 MyBatis-Spring 集成,并通过 @Transactional
注解来管理本地事务。补偿逻辑相对简单,就是执行一个反向的 SQL 更新。
// src/main/java/com/example/participants/UpdateOrderStatusParticipant.java
package com.example.participants;
import com.example.mapper.OrderMapper;
import com.example.model.OrderUpdatePayload;
import com.example.saga.SagaContext;
import com.example.saga.SagaParticipant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.sql.Connection;
@Component
public class UpdateOrderStatusParticipant implements SagaParticipant<OrderUpdatePayload> {
private static final Logger logger = LoggerFactory.getLogger(UpdateOrderStatusParticipant.class);
private final OrderMapper orderMapper;
public UpdateOrderStatusParticipant(OrderMapper orderMapper) {
this.orderMapper = orderMapper;
}
@Override
public String getName() {
return "UpdateOrderStatus";
}
// 注意:这里的Connection参数是Saga协调器为了统一接口而设计的,
// 但在Spring事务管理下,我们不直接使用它。事务由Spring AOP代理控制。
@Override
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public void execute(SagaContext<OrderUpdatePayload> context, Connection connection) {
OrderUpdatePayload payload = context.getPayload();
logger.info("SAGA [{}] EXECUTE: Updating order {} status to {}", context.getSagaId(), payload.getOrderId(), payload.getNewStatus());
// 1. 获取旧状态用于补偿
String oldStatus = orderMapper.getStatusById(payload.getOrderId());
if (oldStatus == null) {
throw new IllegalStateException("Order not found: " + payload.getOrderId());
}
context.setAttribute("oldStatus", oldStatus);
// 2. 执行更新
int updatedRows = orderMapper.updateStatus(payload.getOrderId(), payload.getNewStatus());
if (updatedRows != 1) {
// 如果更新行数不为1,说明订单不存在或状态已变更,抛出异常以触发回滚
throw new IllegalStateException("Failed to update order status for order ID: " + payload.getOrderId());
}
logger.info("SAGA [{}] EXECUTE SUCCESS: Order {} status updated", context.getSagaId(), payload.getOrderId());
}
@Override
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public void compensate(SagaContext<OrderUpdatePayload> context, Connection connection) {
OrderUpdatePayload payload = context.getPayload();
String oldStatus = context.getAttribute("oldStatus");
if (oldStatus == null) {
logger.warn("SAGA [{}] COMPENSATION SKIPPED: oldStatus not found in context for order {}", context.getSagaId(), payload.getOrderId());
return;
}
logger.warn("SAGA [{}] COMPENSATE: Reverting order {} status to {}", context.getSagaId(), payload.getOrderId(), oldStatus);
int updatedRows = orderMapper.updateStatus(payload.getOrderId(), oldStatus);
if (updatedRows != 1) {
// 补偿失败是一个严重问题,需要告警和人工干预
logger.error("SAGA [{}] COMPENSATION FAILED: Could not revert order status for {}", context.getSagaId(), payload.getOrderId());
throw new RuntimeException("Compensation failed for " + getName());
}
logger.info("SAGA [{}] COMPENSATION SUCCESS: Order {} status reverted", context.getSagaId(), payload.getOrderId());
}
}
3. Snowflake 参与者实现
这是最具挑战性的部分。我们需要手动管理 Snowflake JDBC 连接和事务。execute
方法执行 INSERT
,而 compensate
方法必须执行 DELETE
。为了使 DELETE
操作幂等且精确,INSERT
时必须包含 Saga ID。
// src/main/java/com/example/participants/SnowflakeAuditParticipant.java
package com.example.participants;
import com.example.config.SnowflakeConfig;
import com.example.model.OrderUpdatePayload;
import com.example.saga.SagaContext;
import com.example.saga.SagaParticipant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
@Component
public class SnowflakeAuditParticipant implements SagaParticipant<OrderUpdatePayload> {
private static final Logger logger = LoggerFactory.getLogger(SnowflakeAuditParticipant.class);
private final SnowflakeConfig snowflakeConfig;
public SnowflakeAuditParticipant(SnowflakeConfig snowflakeConfig) {
this.snowflakeConfig = snowflakeConfig;
}
@Override
public String getName() {
return "InsertSnowflakeAudit";
}
// Snowflake的JDBC驱动是事务性的,但我们需要手动控制commit/rollback
@Override
public void execute(SagaContext<OrderUpdatePayload> context, Connection connection) throws SQLException {
OrderUpdatePayload payload = context.getPayload();
String sql = "INSERT INTO audit_logs (saga_id, order_id, new_status, event_timestamp) VALUES (?, ?, ?, ?)";
connection.setAutoCommit(false); // 手动管理事务
try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
pstmt.setString(1, context.getSagaId());
pstmt.setString(2, payload.getOrderId());
pstmt.setString(3, payload.getNewStatus());
pstmt.setTimestamp(4, Timestamp.from(Instant.now()));
logger.info("SAGA [{}] EXECUTE: Inserting audit log into Snowflake for order {}", context.getSagaId(), payload.getOrderId());
int affectedRows = pstmt.executeUpdate();
if (affectedRows == 1) {
connection.commit();
logger.info("SAGA [{}] EXECUTE SUCCESS: Snowflake audit log inserted.", context.getSagaId());
} else {
throw new SQLException("Snowflake insert failed, no rows affected.");
}
} catch (SQLException e) {
logger.error("SAGA [{}] EXECUTE FAILED: Error inserting into Snowflake. Rolling back.", context.getSagaId(), e);
connection.rollback();
throw e;
}
}
@Override
public void compensate(SagaContext<OrderUpdatePayload> context, Connection connection) throws SQLException {
// 补偿操作:根据Saga ID删除之前插入的记录
// 这是一个业务层面的回滚
String sql = "DELETE FROM audit_logs WHERE saga_id = ?";
connection.setAutoCommit(false);
try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
pstmt.setString(1, context.getSagaId());
logger.warn("SAGA [{}] COMPENSATE: Deleting audit log from Snowflake for saga {}", context.getSagaId(), context.getSagaId());
int affectedRows = pstmt.executeUpdate();
connection.commit();
if (affectedRows == 0) {
// 可能因为execute步骤根本没成功插入,或者已经被补偿过。这不算错误,但需要记录。
logger.warn("SAGA [{}] COMPENSATION: No audit log found to delete for saga_id {}. It might have already been compensated or never inserted.", context.getSagaId(), context.getSagaId());
} else {
logger.info("SAGA [{}] COMPENSATION SUCCESS: Snowflake audit log deleted.", context.getSagaId());
}
} catch (SQLException e) {
logger.error("SAGA [{}] COMPENSATION FAILED: Error deleting from Snowflake. Rolling back.", context.getSagaId(), e);
connection.rollback();
// 补偿失败是严重问题,需要抛出异常,触发Saga的COMPENSATION_FAILED状态
throw e;
}
}
}
4. Saga 协调器 (Orchestrator)
这是所有逻辑的核心。它按顺序执行参与者,并处理异常,触发补偿流程。为了简化,这里我们不实现完整的Saga日志持久化,但在真实项目中,// TODO: Persist saga state...
注释处必须实现数据库写入。
// src/main/java/com/example/saga/SagaOrchestrator.java
package com.example.saga;
import com.example.config.SnowflakeConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import javax.sql.DataSource;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@Service
public class SagaOrchestrator {
private static final Logger logger = LoggerFactory.getLogger(SagaOrchestrator.class);
// 主业务库的数据源,用于执行MyBatis等参与者
private final DataSource primaryDataSource;
private final SnowflakeConfig snowflakeConfig;
public SagaOrchestrator(@Qualifier("primaryDataSource") DataSource primaryDataSource, SnowflakeConfig snowflakeConfig) {
this.primaryDataSource = primaryDataSource;
this.snowflakeConfig = snowflakeConfig;
}
public <T> void execute(SagaContext<T> context, List<SagaParticipant<T>> participants) {
List<SagaParticipant<T>> completedParticipants = new ArrayList<>();
logger.info("SAGA [{}] STARTED. Participants count: {}", context.getSagaId(), participants.size());
// TODO: Persist saga state: PENDING, sagaId, context payload
for (SagaParticipant<T> participant : participants) {
try {
logger.info("SAGA [{}] Executing participant: {}", context.getSagaId(), participant.getName());
// TODO: Persist participant state: PENDING
// 根据参与者类型获取连接
try (Connection connection = getConnectionForParticipant(participant)) {
participant.execute(context, connection);
completedParticipants.add(participant);
logger.info("SAGA [{}] Participant {} executed successfully", context.getSagaId(), participant.getName());
// TODO: Persist participant state: SUCCESS
}
} catch (Exception e) {
logger.error("SAGA [{}] FAILED at participant: {}. Starting compensation.", context.getSagaId(), participant.getName(), e);
// TODO: Persist participant state: FAILED
// TODO: Persist saga state: COMPENSATING
compensate(context, completedParticipants);
return; // 终止Saga执行
}
}
logger.info("SAGA [{}] COMMITTED successfully.", context.getSagaId());
// TODO: Persist saga state: COMMITTED
}
private <T> void compensate(SagaContext<T> context, List<SagaParticipant<T>> completedParticipants) {
Collections.reverse(completedParticipants); // 逆序补偿
for (SagaParticipant<T> participant : completedParticipants) {
try {
logger.warn("SAGA [{}] Compensating participant: {}", context.getSagaId(), participant.getName());
// TODO: Persist participant state: COMPENSATING
try (Connection connection = getConnectionForParticipant(participant)) {
participant.compensate(context, connection);
logger.info("SAGA [{}] Participant {} compensated successfully", context.getSagaId(), participant.getName());
// TODO: Persist participant state: COMPENSATED
}
} catch (Exception e) {
logger.error("SAGA [{}] CRITICAL: Compensation FAILED for participant: {}. Manual intervention required.",
context.getSagaId(), participant.getName(), e);
// TODO: Persist saga state: COMPENSATION_FAILED
// 此时Saga处于不一致状态,必须触发告警
return; // 停止进一步补偿
}
}
logger.warn("SAGA [{}] COMPENSATED successfully.", context.getSagaId());
// TODO: Persist saga state: COMPENSATED
}
private Connection getConnectionForParticipant(SagaParticipant<?> participant) throws Exception {
// 在真实项目中,这里应该基于一个更健壮的策略,比如参与者注解或类型判断
if (participant.getName().toLowerCase().contains("snowflake")) {
return snowflakeConfig.getConnection();
}
return primaryDataSource.getConnection();
}
}
5. Google Cloud Function 入口
最后,创建 Cloud Function 的入口点,它负责接收请求、初始化 Saga 并执行。
// src/main/java/com/example/OrderUpdateFunction.java
package com.example;
import com.example.model.OrderUpdatePayload;
import com.example.participants.SnowflakeAuditParticipant;
import com.example.participants.UpdateOrderStatusParticipant;
import com.example.saga.SagaContext;
import com.example.saga.SagaOrchestrator;
import com.example.saga.SagaParticipant;
import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.HttpRequest;
import com.google.cloud.functions.HttpResponse;
import com.google.gson.Gson;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public class OrderUpdateFunction implements HttpFunction {
private static final ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
private static final Gson gson = new Gson();
@Override
public void service(HttpRequest request, HttpResponse response) throws IOException {
// 实际项目中应有更健壮的反序列化和验证
OrderUpdatePayload payload = gson.fromJson(request.getReader(), OrderUpdatePayload.class);
SagaOrchestrator orchestrator = context.getBean(SagaOrchestrator.class);
UpdateOrderStatusParticipant p1 = context.getBean(UpdateOrderStatusParticipant.class);
SnowflakeAuditParticipant p2 = context.getBean(SnowflakeAuditParticipant.class);
// 定义Saga的步骤顺序
List<SagaParticipant<OrderUpdatePayload>> participants = Arrays.asList(p1, p2);
SagaContext<OrderUpdatePayload> sagaContext = new SagaContext<>(payload);
try {
orchestrator.execute(sagaContext, participants);
response.setStatusCode(200);
response.getWriter().write("Saga executed for order " + payload.getOrderId());
} catch (Exception e) {
// Saga Orchestrator内部已经处理了补偿逻辑,这里只返回服务端错误
response.setStatusCode(500);
response.getWriter().write("Saga failed and compensation might have been triggered. Saga ID: " + sagaContext.getSagaId());
}
}
}
局限性与未来展望
这个实现虽然解决了眼下的问题,但它并非银弹,存在一些固有的局限性。
首先,系统只达到了最终一致性。在 T1 执行成功而 T2 失败到 C1 补偿完成之间,存在一个短暂的时间窗口,此时数据处于不一致状态。业务上必须能够容忍这种中间状态。
其次,补偿操作的可靠性是整个方案的基石。如果补偿操作(C1 或 C2)本身失败,Saga 将进入 COMPENSATION_FAILED
状态,数据将永久不一致,此时必须有人工介入和强大的监控告警系统来发现并修复这类问题。设计幂等的补偿操作至关重要,以应对重试场景。
再者,隔离性的缺失。由于 Saga 将长事务拆分为多个本地事务,它无法提供 ACID 中的隔离性。在 T1 执行后,其他事务可能会读到已更新但尚未最终提交(因为整个 Saga 未完成)的订单状态。这可能需要引入业务层面的锁或状态标记来规避。
最后,当前协调器的实现是同步阻塞的。对于耗时较长的参与者,这会增加 Cloud Function 的执行时间,从而提高成本并可能导致超时。一个更优化的方案是采用编排(Orchestration)与协同(Choreography)结合的方式,例如,每执行完一步就向 Pub/Sub 发布一个事件,由下一个步骤的函数订阅并执行,但这会显著增加架构的复杂性。
尽管有这些权衡,对于无法使用传统 2PC 的 Serverless 和异构数据源场景,基于补偿的 Saga 模式仍然是一个务实且可靠的工程选择。