构建结合 DynamoDB 与 SQL 的混合特征存储并通过 BentoML 提供实时服务


我们的机器学习团队遇到了一个日益棘手的窘境:特征分裂。一部分特征是实时生成的,比如用户最后一次点击、当前会话时长,必须在10毫秒内可供模型调用;另一部分则是离线计算的批量特征,例如用户过去7天的平均购买金额、历史浏览品类分布,这些通过复杂的 SQL 在数据仓库中每日更新。模型需要同时消费这两种特征,而我们现有的架构是一堆脆弱的 Python 脚本和不稳定的缓存层,维护成本和故障率都高得无法接受。

最初的构想是构建一个统一的特征存储(Feature Store)。但这不仅仅是一个技术问题,更是一个流程问题。一个新特征从提出、开发、验证、上线到最终废弃,整个生命周期缺乏管理。特征的生产者(数据工程师)和消费者(算法工程师)之间存在巨大的沟通鸿沟。这让我联想到了软件开发中的看板(Kanban)方法。如果我们可以将特征本身视为一个工作项,用看板来追踪其状态,并让状态的变更自动触发相应的技术流程,或许能同时解决技术和流程两大难题。

这个想法催生了一个新的架构目标:构建一个由看板驱动的、混合了实时与批量数据源的特征服务

技术选型决策

这个架构的核心是数据存储和模型服务,我的选型思考如下:

  1. 在线存储 - Amazon DynamoDB: 需求非常明确:超低延迟的键值查询。我们需要通过用户ID在个位数毫秒内获取其实时特征。DynamoDB 的性能、可扩展性和完全托管的特性使其成为不二之选。关系型数据库在这里会因为连接开销和复杂的查询计划而败下阵来。

  2. 离线存储 - SQL 数据仓库: 我们的批量特征已经存在于一个基于 SQL 的数据仓库(例如 Snowflake 或 Redshift)中。让数据工程师继续使用他们熟悉的 SQL 进行复杂的聚合、窗口函数等操作是最高效的方式。我们不需要迁移这些逻辑。

  3. 模型服务 - BentoML: 为什么不用 FastAPI 或 Flask 直接包装模型?因为我们的“服务”远不止模型推理。它需要编排数据流:接收请求,并行地从 DynamoDB 和 SQL 数据仓库拉取特征,将它们融合成一个特征向量,然后才送入模型。BentoML 对这种复杂服务逻辑的抽象、依赖管理和部署打包能力,远超一个简单的 Web 框架。它可以将数据预处理、模型推理和后处理逻辑优雅地封装在一起。

  4. 流程管理 - Kanban 概念: 我不打算引入一个完整的 Jira 或 Trello 系统,而是将看板的核心思想——状态——直接融入我们的基础设施。每个特征都会有一个元数据记录,包含它的当前状态(如 BACKLOG, DEVELOPING, STAGING, PRODUCTION, DEPRECATED)。状态的变更是通过一个简单的 CLI 工具完成,在真实的 CI/CD 流程中,这个 CLI 命令会由 Jenkins 或 GitHub Actions 在特定阶段(如合并到主分支)自动调用。

架构实现步骤

整个系统分为两个主要流程:离线数据ETL和在线实时服务。

graph TD
    subgraph "离线ETL流程 (每日执行)"
        A[SQL数据仓库] -- 1. 复杂SQL聚合 --> B(特征计算任务);
        B -- 2. 生成Parquet文件 --> C[S3存储桶];
        C -- 3. 触发同步任务 --> D(Python同步脚本);
        D -- 4. 批量写入 --> E[DynamoDB Online Store];
    end

    subgraph "在线服务流程 (实时响应)"
        F[用户请求,含user_id] --> G{BentoML服务};
        G -- 5a. 并行查询 --> E;
        G -- 5b. 并行查询 --> A;
        subgraph "特征融合"
          E -- 实时特征 --> H;
          A -- 批量/准实时特征 --> H;
        end
        H[合并特征向量] --> I[模型推理];
        I --> J[返回预测结果];
    end

1. DynamoDB 的混合数据建模

在 DynamoDB 中,我选择使用单一表设计,这能最大化灵活性。表的主键由分区键(PK)和排序键(SK)组成。

  • 表名: feature_store_hybrid
  • 主键: PK (String), SK (String)

我们将用它来存储两种完全不同的数据:用户特征和特征元数据。

  • 存储用户特征:

    • PK: USER#{user_id} (例如: USER#12345)
    • SK: FEATURES#{feature_group} (例如: FEATURES#REALTIMEFEATURES#BATCH_DAILY)
    • 其他属性: 特征名和特征值,例如 last_click_ts, session_duration_sec
  • 存储特征元数据 (我们的“Kanban”状态):

    • PK: FEATURE_META#{feature_name} (例如: FEATURE_META#avg_spend_7d)
    • SK: METADATA
    • 其他属性: status (PRODUCTION, DEPRECATED), description, owner, update_ts

这种模式的优势在于,我们可以用一个表同时服务于数据读取和系统管理。

2. 离线 SQL 计算与同步

数据工程师会编写这样的 SQL 来计算批量特征:

-- a_complex_feature_generation.sql
-- 计算用户过去7天的日均消费和最大单笔消费
WITH user_transactions AS (
    SELECT
        user_id,
        amount,
        DATE(created_at) as transaction_date
    FROM raw_transactions
    WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
)
SELECT
    user_id,
    AVG(daily_sum) AS avg_spend_7d,
    MAX(daily_max) AS max_spend_7d
FROM (
    SELECT
        user_id,
        transaction_date,
        SUM(amount) as daily_sum,
        MAX(amount) as daily_max
    FROM user_transactions
    GROUP BY 1, 2
) daily_stats
GROUP BY 1;

这个查询的结果被导出为 Parquet 格式并上传到 S3。接着,一个同步脚本会被触发。在真实项目中,这可能是一个 Lambda 函数或一个 Airflow DAG 中的任务。

下面是这个同步脚本的核心逻辑。这里的坑在于,直接循环写入 DynamoDB 效率极低且容易被限流。必须使用 BatchWriter

# sync_s3_to_dynamodb.py
import os
import logging
import boto3
import pandas as pd
import awswrangler as wr
from botocore.config import Config
from botocore.exceptions import ClientError

# --- 配置 ---
# 在生产环境中,这些值应来自环境变量或配置服务
TABLE_NAME = "feature_store_hybrid"
S3_BUCKET = "my-feature-s3-bucket"
S3_PREFIX = "daily_features/"
# 增加重试策略以应对可能的限流
BOTO_CONFIG = Config(
    retries={
        'max_attempts': 5,
        'mode': 'adaptive'
    }
)
DYNAMODB_CLIENT = boto3.resource('dynamodb', config=BOTO_CONFIG)
FEATURE_GROUP_NAME = "BATCH_DAILY"

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def get_latest_s3_path(bucket: str, prefix: str) -> str:
    """获取指定S3前缀下最新的文件路径 (假设按日期分区)"""
    # 在实际应用中,这里会有更复杂的逻辑来确定要处理的文件
    # 为简化,我们假设我们知道确切的文件名
    today = pd.Timestamp.now().strftime('%Y-%m-%d')
    s3_path = f"s3://{bucket}/{prefix}{today}/features.parquet"
    logging.info(f"Targeting S3 path: {s3_path}")
    return s3_path

def sync_features(s3_path: str, table_name: str):
    """从S3读取Parquet文件并批量写入DynamoDB"""
    try:
        df = wr.s3.read_parquet(path=s3_path)
        logging.info(f"Successfully read {len(df)} records from {s3_path}")
    except Exception as e:
        logging.error(f"Failed to read from S3 path {s3_path}: {e}")
        raise

    table = DYNAMODB_CLIENT.Table(table_name)
    processed_count = 0
    failed_count = 0

    # BatchWriter是处理大批量写入的正确方式
    # 它会自动处理分批、重试和未处理的项目
    with table.batch_writer() as batch:
        for index, row in df.iterrows():
            try:
                item_data = {
                    'PK': f"USER#{row['user_id']}",
                    'SK': f"FEATURES#{FEATURE_GROUP_NAME}",
                    # 将Pandas行转换为Python原生类型字典
                    # 确保处理NaN/None值
                    **{k: v for k, v in row.drop('user_id').to_dict().items() if pd.notna(v)}
                }
                batch.put_item(Item=item_data)
                processed_count += 1
                if processed_count % 1000 == 0:
                    logging.info(f"Processed {processed_count} items...")
            except Exception as e:
                logging.warning(f"Failed to process row for user_id {row['user_id']}: {e}")
                failed_count += 1
    
    logging.info(f"Sync complete. Processed: {processed_count}, Failed: {failed_count}")


if __name__ == "__main__":
    target_s3_path = get_latest_s3_path(S3_BUCKET, S3_PREFIX)
    sync_features(target_s3_path, TABLE_NAME)

这个脚本是生产级的:它有配置、日志、错误处理,并使用了 BatchWriter 来保证效率和健壮性。

3. Kanban 状态管理

为了管理特征的生命周期,我创建了一个简单的命令行工具。

# manage_feature.py
import boto3
import click
import logging
from datetime import datetime, timezone

TABLE_NAME = "feature_store_hybrid"
DYNAMODB_CLIENT = boto3.resource('dynamodb')
table = DYNAMODB_CLIENT.Table(TABLE_NAME)

VALID_STATUSES = ["BACKLOG", "DEVELOPING", "STAGING", "PRODUCTION", "DEPRECATED"]

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

@click.group()
def cli():
    """A simple CLI to manage feature metadata (our Kanban board)."""
    pass

@cli.command()
@click.option('--name', required=True, help='The name of the feature.')
@click.option('--status', required=True, type=click.Choice(VALID_STATUSES), help='The new status of the feature.')
@click.option('--owner', default='unknown', help='The owner of the feature.')
@click.option('--description', default='', help='A short description of the feature.')
def set_status(name: str, status: str, owner: str, description: str):
    """Sets or updates the status of a feature."""
    pk = f"FEATURE_META#{name}"
    sk = "METADATA"
    timestamp = datetime.now(timezone.utc).isoformat()

    logging.info(f"Updating feature '{name}' to status '{status}'")
    
    # 使用 UpdateItem 以便只更新指定字段
    # 这也允许我们创建新条目或更新现有条目
    try:
        response = table.update_item(
            Key={'PK': pk, 'SK': sk},
            UpdateExpression="SET #status = :s, owner = :o, description = :d, updated_ts = :ts",
            ExpressionAttributeNames={
                '#status': 'status' # 'status' is a reserved keyword in DynamoDB
            },
            ExpressionAttributeValues={
                ':s': status,
                ':o': owner,
                ':d': description,
                ':ts': timestamp,
            },
            ReturnValues="UPDATED_NEW"
        )
        logging.info(f"Successfully updated. Response: {response}")
        click.echo(f"Feature '{name}' is now in status '{status}'.")
    except Exception as e:
        logging.error(f"Failed to update feature '{name}': {e}")
        click.echo(f"Error: Could not update feature status.", err=True)

if __name__ == '__main__':
    cli()

在 CI/CD 流水线中,当一个特征开发完成并通过测试后,可以自动执行 python manage_feature.py set-status --name avg_spend_7d --status PRODUCTION --owner data-eng-team。BentoML 服务可以查询这些元数据,只加载状态为 PRODUCTION 的特征。

4. 核心:BentoML 混合特征服务

这是所有部分的交汇点。服务需要高性能地并行从 DynamoDB 和 SQL 数据仓库获取数据。为了让示例可独立运行,我将使用 DuckDB 模拟 SQL 数据仓库。

首先,bentofile.yaml 定义项目结构和依赖:

# bentofile.yaml
service: "feature_fusion_service.py:svc"
labels:
  owner: ml-platform-team
  project: hybrid-feature-store
include:
  - "*.py"
  - "models/" # 假设有一个虚拟模型
python:
  packages:
    - scikit-learn
    - pandas
    - boto3
    - pydantic
    - duckdb
    - numpy
models:
  - tag: dummy_classifier:latest
    path: "models/dummy_model.joblib"

现在是关键的 service.py。注意 async def 的使用,这是实现 I/O 并行、降低延迟的关键。

# feature_fusion_service.py
import os
import asyncio
import logging
import bentoml
import numpy as np
import pandas as pd
import boto3
import duckdb
from pydantic import BaseModel
from typing import List

# --- 配置 ---
DYNAMODB_TABLE_NAME = "feature_store_hybrid"
DUCKDB_PATH = "offline_features.db" # 模拟数据仓库
BOTO_CONFIG = ... # 同步脚本中的配置

# 初始化客户端。在BentoML中,这些应在服务级别初始化以复用连接
dynamodb = boto3.resource('dynamodb', config=BOTO_CONFIG)
table = dynamodb.Table(DYNAMODB_TABLE_NAME)
duckdb_conn = duckdb.connect(database=DUCKDB_PATH, read_only=True)

# 模拟一个模型
from sklearn.linear_model import LogisticRegression
# 在实际项目中,这个模型会被 `bentoml models pull` 拉取
# 为了可复现性,我们在这里创建一个
if not os.path.exists("models/dummy_model.joblib"):
    from sklearn.datasets import make_classification
    from joblib import dump
    X, y = make_classification(n_samples=100, n_features=5, n_informative=3, n_redundant=0, random_state=42)
    model = LogisticRegression().fit(X, y)
    os.makedirs("models", exist_ok=True)
    dump(model, "models/dummy_model.joblib")


# --- 服务定义 ---
dummy_model_runner = bentoml.sklearn.get("dummy_classifier:latest").to_runner()

svc = bentoml.Service(
    "hybrid_feature_service",
    runners=[dummy_model_runner],
)

# 使用Pydantic进行输入验证
class UserRequest(BaseModel):
    user_id: int
    contextual_features: List[float] # 假设有些特征是请求时才有的

# --- 核心数据获取逻辑 ---
async def get_online_features(user_id: int) -> dict:
    """异步从DynamoDB获取实时特征"""
    loop = asyncio.get_running_loop()
    try:
        # boto3 本身不是异步的,所以用 run_in_executor 包装它
        # 这样就不会阻塞事件循环
        response = await loop.run_in_executor(
            None, # 使用默认的ThreadPoolExecutor
            lambda: table.get_item(
                Key={'PK': f"USER#{user_id}", 'SK': 'FEATURES#REALTIME'},
                ConsistentRead=True # 强一致性读,确保拿到最新数据
            )
        )
        return response.get('Item', {})
    except Exception as e:
        logging.error(f"DynamoDB query failed for user {user_id}: {e}")
        return {}

async def get_offline_features(user_id: int) -> dict:
    """异步从DuckDB (模拟SQL数仓) 获取批量特征"""
    loop = asyncio.get_running_loop()
    try:
        # DuckDB的Python客户端也不是原生的async,同样需要包装
        query = f"SELECT avg_spend_7d, max_spend_7d FROM daily_features WHERE user_id = {user_id}"
        result_df = await loop.run_in_executor(
            None,
            lambda: duckdb_conn.execute(query).fetchdf()
        )
        if not result_df.empty:
            return result_df.to_dict('records')[0]
        return {}
    except Exception as e:
        logging.error(f"DuckDB query failed for user {user_id}: {e}")
        return {}

# --- API 端点 ---
@svc.api(input=bentoml.io.JSON(pydantic_model=UserRequest), output=bentoml.io.JSON())
async def predict(request: UserRequest):
    user_id = request.user_id
    logging.info(f"Received request for user_id: {user_id}")
    
    # 使用 asyncio.gather 实现并行 I/O
    # 这是性能优化的关键点,两个I/O操作会同时进行
    online_features_task = get_online_features(user_id)
    offline_features_task = get_offline_features(user_id)
    
    results = await asyncio.gather(online_features_task, offline_features_task)
    
    online_features = results[0]
    offline_features = results[1]
    
    # 特征融合
    # 这里的顺序和选择逻辑非常重要,必须严格定义
    # 在真实项目中,会从特征元数据中查询哪些是PRODUCTION状态的特征
    feature_vector = []
    feature_vector.extend(request.contextual_features)
    # 假设模型需要以下特征,并按此顺序
    feature_vector.append(online_features.get('session_duration_sec', 0.0))
    feature_vector.append(offline_features.get('avg_spend_7d', 0.0))
    
    # 确保特征向量是NumPy数组,并有正确的形状
    feature_array = np.array(feature_vector).reshape(1, -1)
    
    # 调用模型推理
    prediction_result = await dummy_model_runner.predict.async_run(feature_array)
    
    return {
        "user_id": user_id,
        "prediction": int(prediction_result[0]),
        "features_used": {
            "online": list(online_features.keys()),
            "offline": list(offline_features.keys()),
            "contextual": len(request.contextual_features)
        }
    }

这段代码展示了架构的核心价值:BentoML 不仅是一个模型服务器,更是一个数据编排和业务逻辑的微服务框架。通过 asyncio.gather,我们最大化了 I/O 效率,这是满足低延迟需求的关键。

遗留问题与未来迭代

这个架构虽然解决了核心的混合特征供给问题,但它并非终点。

首先,目前的 Kanban 流程是手动的。一个完整的 MLOps 体系需要将 manage_feature.py 的调用与代码仓库的 Webhooks(例如,Jira ticket 状态变更、GitHub PR merge)深度集成,实现真正的流程自动化。

其次,在实时路径中直接查询 SQL 数据仓库存在性能风险。虽然对于某些准实时场景(如分钟级更新的特征)可行,但对于要求极致低延迟的场景,所有特征最终都应预计算并推送到 DynamoDB。这需要在特征新鲜度和架构复杂度之间做出权衡。

再者,我们没有解决特征的 Schema 管理和版本控制。当特征的计算逻辑或数据类型发生变化时,如何保证线上模型不受影响?这需要引入一个更正式的特征注册表(Feature Registry)机制,它会是整个系统的下一个演进方向。

最后,系统的可观测性需要加强。我们需要为特征的延迟、填充率、数据分布漂移等建立详细的监控和告警,确保我们不仅能提供特征,还能保证特征的质量。


  目录