在 Google Cloud Functions 中实现基于补偿的 Saga 模式协调 MyBatis 与 Snowflake 的分布式事务


一个看似简单的业务需求摆在了面前:用户完成某个关键操作后,系统需要同时在主业务库(PostgreSQL)中更新订单状态,并向数据仓库(Snowflake)中插入一条不可变的审计日志。这两个操作必须构成一个原子单元——要么都成功,要么都失败。

第一反应是使用分布式事务,但这个方案在技术选型阶段就被直接否决。我们的执行环境是 Google Cloud Functions,一个典型的 Serverless 平台,其短暂、无状态的特性对长事务协调极不友好。更关键的是,参与方之一 Snowflake 是一个云数据仓库,其设计哲学是为分析查询(OLAP)而非事务处理(OLTP)优化的,它根本不支持 2PC(两阶段提交)协议。这意味着传统的 XA 事务方案从一开始就不在考虑范围内。

直接在函数代码里依次执行两个写操作,然后寄希望于网络和数据库不出问题,这在生产环境中是不可接受的。这种“祈祷式编程”会留下数据不一致的烂摊子,当 PostgreSQL 写入成功而 Snowflake 写入失败时,我们就丢失了关键的审计记录,反之亦然。

排除了强一致性方案后,我们只能转向最终一致性。Saga 模式是解决这类跨服务、跨数据库数据一致性问题的标准实践。它将一个长事务拆分为一系列本地事务,每个本地事务都有一个对应的补偿操作。如果任何一个本地事务失败,Saga 会反向调用之前所有已成功事务的补偿操作,从而使系统状态回滚到初始状态。

在我们的场景下,这套逻辑非常清晰:

  1. Saga 开始
  2. 事务 T1: 使用 MyBatis 在 PostgreSQL 中更新订单状态。
  3. 事务 T2: 使用 Snowflake JDBC Driver 在 Snowflake 中插入审计日志。
  4. Saga 结束

如果 T2 失败,Saga 必须执行 T1 的补偿操作 C1。C1 的逻辑就是将订单状态恢复到更新前的状态。由于 T1 本身是一个本地数据库事务,它的失败会自动回滚,无需显式补偿。棘手的是 T2 的补偿操作 C2。由于 Snowflake 的 INSERT 操作一旦提交就无法“回滚”,C2 必须是一个业务逻辑上的逆操作,比如执行一条 DELETE 语句来删除刚刚插入的审计日志。

架构决策与Saga协调器设计

在 Serverless 环境中实现 Saga,协调器的状态管理是核心难点。Cloud Function 实例是短暂的,不能在内存中维护 Saga 的执行状态。我们需要一个外部存储来持久化 Saga 的执行日志,以便在函数实例崩溃、超时或重试时能够恢复状态。Firestore 或 Cloud SQL 都是可选项,考虑到读写延迟和成本,我们选择使用同一个 PostgreSQL 库中的一张专用表来记录 Saga 日志。

我们的 Saga 协调器设计如下:

sequenceDiagram
    participant GCF as Google Cloud Function (Orchestrator)
    participant SagaLogDB as PostgreSQL (Saga Log Table)
    participant BizDB as PostgreSQL (Business DB via MyBatis)
    participant DWH as Snowflake

    GCF->>SagaLogDB: 1. 创建Saga实例, 状态: PENDING
    GCF->>BizDB: 2. 执行T1: 更新订单状态
    alt T1 成功
        GCF->>SagaLogDB: 3a. 更新Saga步骤1状态: SUCCESS
        GCF->>DWH: 4. 执行T2: 插入审计日志
        alt T2 成功
            GCF->>SagaLogDB: 5a. 更新Saga步骤2状态: SUCCESS
            GCF->>SagaLogDB: 6a. 更新Saga实例状态: COMMITTED
        else T2 失败
            GCF->>SagaLogDB: 5b. 更新Saga步骤2状态: FAILED
            GCF->>SagaLogDB: 6b. 更新Saga实例状态: COMPENSATING
            GCF->>BizDB: 7. 执行C1: 补偿T1(恢复订单状态)
            alt C1 成功
                 GCF->>SagaLogDB: 8a. 更新Saga实例状态: COMPENSATED
            else C1 失败
                 GCF->>SagaLogDB: 8b. 更新Saga实例状态: COMPENSATION_FAILED (需要人工介入)
            end
        end
    else T1 失败
        GCF->>SagaLogDB: 3b. 更新Saga步骤1状态: FAILED
        GCF->>SagaLogDB: 4b. 更新Saga实例状态: ABORTED (无需补偿)
    end

这个流程图清晰地展示了成功路径和失败补偿路径。协调器的核心职责是驱动流程、记录状态,并在失败时触发补偿逻辑。

核心代码实现

我们使用 Java 17 作为 Cloud Functions 的运行时,依赖项包括 MyBatis、PostgreSQL JDBC Driver 和 Snowflake JDBC Driver。

1. Saga 定义与参与者接口

首先,定义 Saga 流程的参与者接口。每个参与者都需要定义一个执行 (execute) 方法和一个补偿 (compensate) 方法。

// src/main/java/com/example/saga/SagaParticipant.java
package com.example.saga;

import java.sql.Connection;

/**
 * Saga 模式中的一个参与者(步骤)
 * 定义了每个步骤需要执行的业务操作和补偿操作
 * @param <T> 业务操作的输入参数类型
 */
public interface SagaParticipant<T> {

    /**
     * 获取参与者的名称,用于日志和Saga状态记录
     * @return 参与者名称
     */
    String getName();

    /**
     * 执行正向业务操作
     *
     * @param context 包含执行所需参数的上下文
     * @param connection 数据库连接,由Saga协调器提供,用于确保本地事务
     * @throws Exception 操作失败时抛出异常
     */
    void execute(SagaContext<T> context, Connection connection) throws Exception;

    /**
     * 执行补偿操作
     *
     * @param context 包含补偿所需参数的上下文
     * @param connection 数据库连接,由Saga协调器提供
     * @throws Exception 补偿失败时抛出异常
     */
    void compensate(SagaContext<T> context, Connection connection) throws Exception;
}

SagaContext 只是一个简单的容器,用于在 Saga 的不同步骤之间传递数据。

// src/main/java/com/example/saga/SagaContext.java
package com.example.saga;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class SagaContext<T> {
    private final String sagaId;
    private final T payload;
    private final Map<String, Object> attributes;

    public SagaContext(T payload) {
        this.sagaId = UUID.randomUUID().toString();
        this.payload = payload;
        this.attributes = new HashMap<>();
    }

    public String getSagaId() {
        return sagaId;
    }

    public T getPayload() {
        return payload;
    }

    public void setAttribute(String key, Object value) {
        this.attributes.put(key, value);
    }

    @SuppressWarnings("unchecked")
    public <V> V getAttribute(String key) {
        return (V) this.attributes.get(key);
    }
}

2. MyBatis 参与者实现

这个参与者负责更新 PostgreSQL 数据库。我们使用 MyBatis-Spring 集成,并通过 @Transactional 注解来管理本地事务。补偿逻辑相对简单,就是执行一个反向的 SQL 更新。

// src/main/java/com/example/participants/UpdateOrderStatusParticipant.java
package com.example.participants;

import com.example.mapper.OrderMapper;
import com.example.model.OrderUpdatePayload;
import com.example.saga.SagaContext;
import com.example.saga.SagaParticipant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import java.sql.Connection;

@Component
public class UpdateOrderStatusParticipant implements SagaParticipant<OrderUpdatePayload> {
    private static final Logger logger = LoggerFactory.getLogger(UpdateOrderStatusParticipant.class);

    private final OrderMapper orderMapper;

    public UpdateOrderStatusParticipant(OrderMapper orderMapper) {
        this.orderMapper = orderMapper;
    }

    @Override
    public String getName() {
        return "UpdateOrderStatus";
    }

    // 注意:这里的Connection参数是Saga协调器为了统一接口而设计的,
    // 但在Spring事务管理下,我们不直接使用它。事务由Spring AOP代理控制。
    @Override
    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
    public void execute(SagaContext<OrderUpdatePayload> context, Connection connection) {
        OrderUpdatePayload payload = context.getPayload();
        logger.info("SAGA [{}] EXECUTE: Updating order {} status to {}", context.getSagaId(), payload.getOrderId(), payload.getNewStatus());
        
        // 1. 获取旧状态用于补偿
        String oldStatus = orderMapper.getStatusById(payload.getOrderId());
        if (oldStatus == null) {
            throw new IllegalStateException("Order not found: " + payload.getOrderId());
        }
        context.setAttribute("oldStatus", oldStatus);

        // 2. 执行更新
        int updatedRows = orderMapper.updateStatus(payload.getOrderId(), payload.getNewStatus());
        if (updatedRows != 1) {
            // 如果更新行数不为1,说明订单不存在或状态已变更,抛出异常以触发回滚
            throw new IllegalStateException("Failed to update order status for order ID: " + payload.getOrderId());
        }
        logger.info("SAGA [{}] EXECUTE SUCCESS: Order {} status updated", context.getSagaId(), payload.getOrderId());
    }

    @Override
    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
    public void compensate(SagaContext<OrderUpdatePayload> context, Connection connection) {
        OrderUpdatePayload payload = context.getPayload();
        String oldStatus = context.getAttribute("oldStatus");
        
        if (oldStatus == null) {
            logger.warn("SAGA [{}] COMPENSATION SKIPPED: oldStatus not found in context for order {}", context.getSagaId(), payload.getOrderId());
            return;
        }

        logger.warn("SAGA [{}] COMPENSATE: Reverting order {} status to {}", context.getSagaId(), payload.getOrderId(), oldStatus);
        int updatedRows = orderMapper.updateStatus(payload.getOrderId(), oldStatus);
        if (updatedRows != 1) {
            // 补偿失败是一个严重问题,需要告警和人工干预
            logger.error("SAGA [{}] COMPENSATION FAILED: Could not revert order status for {}", context.getSagaId(), payload.getOrderId());
            throw new RuntimeException("Compensation failed for " + getName());
        }
        logger.info("SAGA [{}] COMPENSATION SUCCESS: Order {} status reverted", context.getSagaId(), payload.getOrderId());
    }
}

3. Snowflake 参与者实现

这是最具挑战性的部分。我们需要手动管理 Snowflake JDBC 连接和事务。execute 方法执行 INSERT,而 compensate 方法必须执行 DELETE。为了使 DELETE 操作幂等且精确,INSERT 时必须包含 Saga ID。

// src/main/java/com/example/participants/SnowflakeAuditParticipant.java
package com.example.participants;

import com.example.config.SnowflakeConfig;
import com.example.model.OrderUpdatePayload;
import com.example.saga.SagaContext;
import com.example.saga.SagaParticipant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;

@Component
public class SnowflakeAuditParticipant implements SagaParticipant<OrderUpdatePayload> {
    private static final Logger logger = LoggerFactory.getLogger(SnowflakeAuditParticipant.class);

    private final SnowflakeConfig snowflakeConfig;

    public SnowflakeAuditParticipant(SnowflakeConfig snowflakeConfig) {
        this.snowflakeConfig = snowflakeConfig;
    }

    @Override
    public String getName() {
        return "InsertSnowflakeAudit";
    }

    // Snowflake的JDBC驱动是事务性的,但我们需要手动控制commit/rollback
    @Override
    public void execute(SagaContext<OrderUpdatePayload> context, Connection connection) throws SQLException {
        OrderUpdatePayload payload = context.getPayload();
        String sql = "INSERT INTO audit_logs (saga_id, order_id, new_status, event_timestamp) VALUES (?, ?, ?, ?)";
        
        connection.setAutoCommit(false); // 手动管理事务

        try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
            pstmt.setString(1, context.getSagaId());
            pstmt.setString(2, payload.getOrderId());
            pstmt.setString(3, payload.getNewStatus());
            pstmt.setTimestamp(4, Timestamp.from(Instant.now()));

            logger.info("SAGA [{}] EXECUTE: Inserting audit log into Snowflake for order {}", context.getSagaId(), payload.getOrderId());
            int affectedRows = pstmt.executeUpdate();
            if (affectedRows == 1) {
                connection.commit();
                logger.info("SAGA [{}] EXECUTE SUCCESS: Snowflake audit log inserted.", context.getSagaId());
            } else {
                throw new SQLException("Snowflake insert failed, no rows affected.");
            }
        } catch (SQLException e) {
            logger.error("SAGA [{}] EXECUTE FAILED: Error inserting into Snowflake. Rolling back.", context.getSagaId(), e);
            connection.rollback();
            throw e;
        }
    }

    @Override
    public void compensate(SagaContext<OrderUpdatePayload> context, Connection connection) throws SQLException {
        // 补偿操作:根据Saga ID删除之前插入的记录
        // 这是一个业务层面的回滚
        String sql = "DELETE FROM audit_logs WHERE saga_id = ?";
        
        connection.setAutoCommit(false);

        try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
            pstmt.setString(1, context.getSagaId());

            logger.warn("SAGA [{}] COMPENSATE: Deleting audit log from Snowflake for saga {}", context.getSagaId(), context.getSagaId());
            int affectedRows = pstmt.executeUpdate();
            connection.commit();

            if (affectedRows == 0) {
                // 可能因为execute步骤根本没成功插入,或者已经被补偿过。这不算错误,但需要记录。
                logger.warn("SAGA [{}] COMPENSATION: No audit log found to delete for saga_id {}. It might have already been compensated or never inserted.", context.getSagaId(), context.getSagaId());
            } else {
                logger.info("SAGA [{}] COMPENSATION SUCCESS: Snowflake audit log deleted.", context.getSagaId());
            }

        } catch (SQLException e) {
            logger.error("SAGA [{}] COMPENSATION FAILED: Error deleting from Snowflake. Rolling back.", context.getSagaId(), e);
            connection.rollback();
            // 补偿失败是严重问题,需要抛出异常,触发Saga的COMPENSATION_FAILED状态
            throw e;
        }
    }
}

4. Saga 协调器 (Orchestrator)

这是所有逻辑的核心。它按顺序执行参与者,并处理异常,触发补偿流程。为了简化,这里我们不实现完整的Saga日志持久化,但在真实项目中,// TODO: Persist saga state... 注释处必须实现数据库写入。

// src/main/java/com/example/saga/SagaOrchestrator.java
package com.example.saga;

import com.example.config.SnowflakeConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import javax.sql.DataSource;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Service
public class SagaOrchestrator {
    private static final Logger logger = LoggerFactory.getLogger(SagaOrchestrator.class);

    // 主业务库的数据源,用于执行MyBatis等参与者
    private final DataSource primaryDataSource;
    private final SnowflakeConfig snowflakeConfig;

    public SagaOrchestrator(@Qualifier("primaryDataSource") DataSource primaryDataSource, SnowflakeConfig snowflakeConfig) {
        this.primaryDataSource = primaryDataSource;
        this.snowflakeConfig = snowflakeConfig;
    }

    public <T> void execute(SagaContext<T> context, List<SagaParticipant<T>> participants) {
        List<SagaParticipant<T>> completedParticipants = new ArrayList<>();
        
        logger.info("SAGA [{}] STARTED. Participants count: {}", context.getSagaId(), participants.size());
        // TODO: Persist saga state: PENDING, sagaId, context payload

        for (SagaParticipant<T> participant : participants) {
            try {
                logger.info("SAGA [{}] Executing participant: {}", context.getSagaId(), participant.getName());
                // TODO: Persist participant state: PENDING
                
                // 根据参与者类型获取连接
                try (Connection connection = getConnectionForParticipant(participant)) {
                    participant.execute(context, connection);
                    completedParticipants.add(participant);
                    logger.info("SAGA [{}] Participant {} executed successfully", context.getSagaId(), participant.getName());
                    // TODO: Persist participant state: SUCCESS
                }

            } catch (Exception e) {
                logger.error("SAGA [{}] FAILED at participant: {}. Starting compensation.", context.getSagaId(), participant.getName(), e);
                // TODO: Persist participant state: FAILED
                // TODO: Persist saga state: COMPENSATING
                compensate(context, completedParticipants);
                return; // 终止Saga执行
            }
        }
        
        logger.info("SAGA [{}] COMMITTED successfully.", context.getSagaId());
        // TODO: Persist saga state: COMMITTED
    }

    private <T> void compensate(SagaContext<T> context, List<SagaParticipant<T>> completedParticipants) {
        Collections.reverse(completedParticipants); // 逆序补偿
        
        for (SagaParticipant<T> participant : completedParticipants) {
            try {
                logger.warn("SAGA [{}] Compensating participant: {}", context.getSagaId(), participant.getName());
                // TODO: Persist participant state: COMPENSATING
                
                try (Connection connection = getConnectionForParticipant(participant)) {
                    participant.compensate(context, connection);
                    logger.info("SAGA [{}] Participant {} compensated successfully", context.getSagaId(), participant.getName());
                    // TODO: Persist participant state: COMPENSATED
                }

            } catch (Exception e) {
                logger.error("SAGA [{}] CRITICAL: Compensation FAILED for participant: {}. Manual intervention required.",
                    context.getSagaId(), participant.getName(), e);
                // TODO: Persist saga state: COMPENSATION_FAILED
                // 此时Saga处于不一致状态,必须触发告警
                return; // 停止进一步补偿
            }
        }
        
        logger.warn("SAGA [{}] COMPENSATED successfully.", context.getSagaId());
        // TODO: Persist saga state: COMPENSATED
    }

    private Connection getConnectionForParticipant(SagaParticipant<?> participant) throws Exception {
        // 在真实项目中,这里应该基于一个更健壮的策略,比如参与者注解或类型判断
        if (participant.getName().toLowerCase().contains("snowflake")) {
            return snowflakeConfig.getConnection();
        }
        return primaryDataSource.getConnection();
    }
}

5. Google Cloud Function 入口

最后,创建 Cloud Function 的入口点,它负责接收请求、初始化 Saga 并执行。

// src/main/java/com/example/OrderUpdateFunction.java
package com.example;

import com.example.model.OrderUpdatePayload;
import com.example.participants.SnowflakeAuditParticipant;
import com.example.participants.UpdateOrderStatusParticipant;
import com.example.saga.SagaContext;
import com.example.saga.SagaOrchestrator;
import com.example.saga.SagaParticipant;
import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.HttpRequest;
import com.google.cloud.functions.HttpResponse;
import com.google.gson.Gson;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

public class OrderUpdateFunction implements HttpFunction {

    private static final ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
    private static final Gson gson = new Gson();

    @Override
    public void service(HttpRequest request, HttpResponse response) throws IOException {
        // 实际项目中应有更健壮的反序列化和验证
        OrderUpdatePayload payload = gson.fromJson(request.getReader(), OrderUpdatePayload.class);

        SagaOrchestrator orchestrator = context.getBean(SagaOrchestrator.class);
        UpdateOrderStatusParticipant p1 = context.getBean(UpdateOrderStatusParticipant.class);
        SnowflakeAuditParticipant p2 = context.getBean(SnowflakeAuditParticipant.class);

        // 定义Saga的步骤顺序
        List<SagaParticipant<OrderUpdatePayload>> participants = Arrays.asList(p1, p2);
        
        SagaContext<OrderUpdatePayload> sagaContext = new SagaContext<>(payload);

        try {
            orchestrator.execute(sagaContext, participants);
            response.setStatusCode(200);
            response.getWriter().write("Saga executed for order " + payload.getOrderId());
        } catch (Exception e) {
            // Saga Orchestrator内部已经处理了补偿逻辑,这里只返回服务端错误
            response.setStatusCode(500);
            response.getWriter().write("Saga failed and compensation might have been triggered. Saga ID: " + sagaContext.getSagaId());
        }
    }
}

局限性与未来展望

这个实现虽然解决了眼下的问题,但它并非银弹,存在一些固有的局限性。

首先,系统只达到了最终一致性。在 T1 执行成功而 T2 失败到 C1 补偿完成之间,存在一个短暂的时间窗口,此时数据处于不一致状态。业务上必须能够容忍这种中间状态。

其次,补偿操作的可靠性是整个方案的基石。如果补偿操作(C1 或 C2)本身失败,Saga 将进入 COMPENSATION_FAILED 状态,数据将永久不一致,此时必须有人工介入和强大的监控告警系统来发现并修复这类问题。设计幂等的补偿操作至关重要,以应对重试场景。

再者,隔离性的缺失。由于 Saga 将长事务拆分为多个本地事务,它无法提供 ACID 中的隔离性。在 T1 执行后,其他事务可能会读到已更新但尚未最终提交(因为整个 Saga 未完成)的订单状态。这可能需要引入业务层面的锁或状态标记来规避。

最后,当前协调器的实现是同步阻塞的。对于耗时较长的参与者,这会增加 Cloud Function 的执行时间,从而提高成本并可能导致超时。一个更优化的方案是采用编排(Orchestration)与协同(Choreography)结合的方式,例如,每执行完一步就向 Pub/Sub 发布一个事件,由下一个步骤的函数订阅并执行,但这会显著增加架构的复杂性。

尽管有这些权衡,对于无法使用传统 2PC 的 Serverless 和异构数据源场景,基于补偿的 Saga 模式仍然是一个务实且可靠的工程选择。


  目录