我们的机器学习团队遇到了一个日益棘手的窘境:特征分裂。一部分特征是实时生成的,比如用户最后一次点击、当前会话时长,必须在10毫秒内可供模型调用;另一部分则是离线计算的批量特征,例如用户过去7天的平均购买金额、历史浏览品类分布,这些通过复杂的 SQL 在数据仓库中每日更新。模型需要同时消费这两种特征,而我们现有的架构是一堆脆弱的 Python 脚本和不稳定的缓存层,维护成本和故障率都高得无法接受。
最初的构想是构建一个统一的特征存储(Feature Store)。但这不仅仅是一个技术问题,更是一个流程问题。一个新特征从提出、开发、验证、上线到最终废弃,整个生命周期缺乏管理。特征的生产者(数据工程师)和消费者(算法工程师)之间存在巨大的沟通鸿沟。这让我联想到了软件开发中的看板(Kanban)方法。如果我们可以将特征本身视为一个工作项,用看板来追踪其状态,并让状态的变更自动触发相应的技术流程,或许能同时解决技术和流程两大难题。
这个想法催生了一个新的架构目标:构建一个由看板驱动的、混合了实时与批量数据源的特征服务。
技术选型决策
这个架构的核心是数据存储和模型服务,我的选型思考如下:
在线存储 - Amazon DynamoDB: 需求非常明确:超低延迟的键值查询。我们需要通过用户ID在个位数毫秒内获取其实时特征。DynamoDB 的性能、可扩展性和完全托管的特性使其成为不二之选。关系型数据库在这里会因为连接开销和复杂的查询计划而败下阵来。
离线存储 - SQL 数据仓库: 我们的批量特征已经存在于一个基于 SQL 的数据仓库(例如 Snowflake 或 Redshift)中。让数据工程师继续使用他们熟悉的 SQL 进行复杂的聚合、窗口函数等操作是最高效的方式。我们不需要迁移这些逻辑。
模型服务 - BentoML: 为什么不用 FastAPI 或 Flask 直接包装模型?因为我们的“服务”远不止模型推理。它需要编排数据流:接收请求,并行地从 DynamoDB 和 SQL 数据仓库拉取特征,将它们融合成一个特征向量,然后才送入模型。BentoML 对这种复杂服务逻辑的抽象、依赖管理和部署打包能力,远超一个简单的 Web 框架。它可以将数据预处理、模型推理和后处理逻辑优雅地封装在一起。
流程管理 - Kanban 概念: 我不打算引入一个完整的 Jira 或 Trello 系统,而是将看板的核心思想——状态——直接融入我们的基础设施。每个特征都会有一个元数据记录,包含它的当前状态(如
BACKLOG
,DEVELOPING
,STAGING
,PRODUCTION
,DEPRECATED
)。状态的变更是通过一个简单的 CLI 工具完成,在真实的 CI/CD 流程中,这个 CLI 命令会由 Jenkins 或 GitHub Actions 在特定阶段(如合并到主分支)自动调用。
架构实现步骤
整个系统分为两个主要流程:离线数据ETL和在线实时服务。
graph TD subgraph "离线ETL流程 (每日执行)" A[SQL数据仓库] -- 1. 复杂SQL聚合 --> B(特征计算任务); B -- 2. 生成Parquet文件 --> C[S3存储桶]; C -- 3. 触发同步任务 --> D(Python同步脚本); D -- 4. 批量写入 --> E[DynamoDB Online Store]; end subgraph "在线服务流程 (实时响应)" F[用户请求,含user_id] --> G{BentoML服务}; G -- 5a. 并行查询 --> E; G -- 5b. 并行查询 --> A; subgraph "特征融合" E -- 实时特征 --> H; A -- 批量/准实时特征 --> H; end H[合并特征向量] --> I[模型推理]; I --> J[返回预测结果]; end
1. DynamoDB 的混合数据建模
在 DynamoDB 中,我选择使用单一表设计,这能最大化灵活性。表的主键由分区键(PK)和排序键(SK)组成。
- 表名:
feature_store_hybrid
- 主键:
PK
(String),SK
(String)
我们将用它来存储两种完全不同的数据:用户特征和特征元数据。
存储用户特征:
-
PK
:USER#{user_id}
(例如:USER#12345
) -
SK
:FEATURES#{feature_group}
(例如:FEATURES#REALTIME
或FEATURES#BATCH_DAILY
) - 其他属性: 特征名和特征值,例如
last_click_ts
,session_duration_sec
。
-
存储特征元数据 (我们的“Kanban”状态):
-
PK
:FEATURE_META#{feature_name}
(例如:FEATURE_META#avg_spend_7d
) -
SK
:METADATA
- 其他属性:
status
(PRODUCTION
,DEPRECATED
),description
,owner
,update_ts
。
-
这种模式的优势在于,我们可以用一个表同时服务于数据读取和系统管理。
2. 离线 SQL 计算与同步
数据工程师会编写这样的 SQL 来计算批量特征:
-- a_complex_feature_generation.sql
-- 计算用户过去7天的日均消费和最大单笔消费
WITH user_transactions AS (
SELECT
user_id,
amount,
DATE(created_at) as transaction_date
FROM raw_transactions
WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
)
SELECT
user_id,
AVG(daily_sum) AS avg_spend_7d,
MAX(daily_max) AS max_spend_7d
FROM (
SELECT
user_id,
transaction_date,
SUM(amount) as daily_sum,
MAX(amount) as daily_max
FROM user_transactions
GROUP BY 1, 2
) daily_stats
GROUP BY 1;
这个查询的结果被导出为 Parquet 格式并上传到 S3。接着,一个同步脚本会被触发。在真实项目中,这可能是一个 Lambda 函数或一个 Airflow DAG 中的任务。
下面是这个同步脚本的核心逻辑。这里的坑在于,直接循环写入 DynamoDB 效率极低且容易被限流。必须使用 BatchWriter
。
# sync_s3_to_dynamodb.py
import os
import logging
import boto3
import pandas as pd
import awswrangler as wr
from botocore.config import Config
from botocore.exceptions import ClientError
# --- 配置 ---
# 在生产环境中,这些值应来自环境变量或配置服务
TABLE_NAME = "feature_store_hybrid"
S3_BUCKET = "my-feature-s3-bucket"
S3_PREFIX = "daily_features/"
# 增加重试策略以应对可能的限流
BOTO_CONFIG = Config(
retries={
'max_attempts': 5,
'mode': 'adaptive'
}
)
DYNAMODB_CLIENT = boto3.resource('dynamodb', config=BOTO_CONFIG)
FEATURE_GROUP_NAME = "BATCH_DAILY"
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_latest_s3_path(bucket: str, prefix: str) -> str:
"""获取指定S3前缀下最新的文件路径 (假设按日期分区)"""
# 在实际应用中,这里会有更复杂的逻辑来确定要处理的文件
# 为简化,我们假设我们知道确切的文件名
today = pd.Timestamp.now().strftime('%Y-%m-%d')
s3_path = f"s3://{bucket}/{prefix}{today}/features.parquet"
logging.info(f"Targeting S3 path: {s3_path}")
return s3_path
def sync_features(s3_path: str, table_name: str):
"""从S3读取Parquet文件并批量写入DynamoDB"""
try:
df = wr.s3.read_parquet(path=s3_path)
logging.info(f"Successfully read {len(df)} records from {s3_path}")
except Exception as e:
logging.error(f"Failed to read from S3 path {s3_path}: {e}")
raise
table = DYNAMODB_CLIENT.Table(table_name)
processed_count = 0
failed_count = 0
# BatchWriter是处理大批量写入的正确方式
# 它会自动处理分批、重试和未处理的项目
with table.batch_writer() as batch:
for index, row in df.iterrows():
try:
item_data = {
'PK': f"USER#{row['user_id']}",
'SK': f"FEATURES#{FEATURE_GROUP_NAME}",
# 将Pandas行转换为Python原生类型字典
# 确保处理NaN/None值
**{k: v for k, v in row.drop('user_id').to_dict().items() if pd.notna(v)}
}
batch.put_item(Item=item_data)
processed_count += 1
if processed_count % 1000 == 0:
logging.info(f"Processed {processed_count} items...")
except Exception as e:
logging.warning(f"Failed to process row for user_id {row['user_id']}: {e}")
failed_count += 1
logging.info(f"Sync complete. Processed: {processed_count}, Failed: {failed_count}")
if __name__ == "__main__":
target_s3_path = get_latest_s3_path(S3_BUCKET, S3_PREFIX)
sync_features(target_s3_path, TABLE_NAME)
这个脚本是生产级的:它有配置、日志、错误处理,并使用了 BatchWriter
来保证效率和健壮性。
3. Kanban 状态管理
为了管理特征的生命周期,我创建了一个简单的命令行工具。
# manage_feature.py
import boto3
import click
import logging
from datetime import datetime, timezone
TABLE_NAME = "feature_store_hybrid"
DYNAMODB_CLIENT = boto3.resource('dynamodb')
table = DYNAMODB_CLIENT.Table(TABLE_NAME)
VALID_STATUSES = ["BACKLOG", "DEVELOPING", "STAGING", "PRODUCTION", "DEPRECATED"]
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@click.group()
def cli():
"""A simple CLI to manage feature metadata (our Kanban board)."""
pass
@cli.command()
@click.option('--name', required=True, help='The name of the feature.')
@click.option('--status', required=True, type=click.Choice(VALID_STATUSES), help='The new status of the feature.')
@click.option('--owner', default='unknown', help='The owner of the feature.')
@click.option('--description', default='', help='A short description of the feature.')
def set_status(name: str, status: str, owner: str, description: str):
"""Sets or updates the status of a feature."""
pk = f"FEATURE_META#{name}"
sk = "METADATA"
timestamp = datetime.now(timezone.utc).isoformat()
logging.info(f"Updating feature '{name}' to status '{status}'")
# 使用 UpdateItem 以便只更新指定字段
# 这也允许我们创建新条目或更新现有条目
try:
response = table.update_item(
Key={'PK': pk, 'SK': sk},
UpdateExpression="SET #status = :s, owner = :o, description = :d, updated_ts = :ts",
ExpressionAttributeNames={
'#status': 'status' # 'status' is a reserved keyword in DynamoDB
},
ExpressionAttributeValues={
':s': status,
':o': owner,
':d': description,
':ts': timestamp,
},
ReturnValues="UPDATED_NEW"
)
logging.info(f"Successfully updated. Response: {response}")
click.echo(f"Feature '{name}' is now in status '{status}'.")
except Exception as e:
logging.error(f"Failed to update feature '{name}': {e}")
click.echo(f"Error: Could not update feature status.", err=True)
if __name__ == '__main__':
cli()
在 CI/CD 流水线中,当一个特征开发完成并通过测试后,可以自动执行 python manage_feature.py set-status --name avg_spend_7d --status PRODUCTION --owner data-eng-team
。BentoML 服务可以查询这些元数据,只加载状态为 PRODUCTION
的特征。
4. 核心:BentoML 混合特征服务
这是所有部分的交汇点。服务需要高性能地并行从 DynamoDB 和 SQL 数据仓库获取数据。为了让示例可独立运行,我将使用 DuckDB 模拟 SQL 数据仓库。
首先,bentofile.yaml
定义项目结构和依赖:
# bentofile.yaml
service: "feature_fusion_service.py:svc"
labels:
owner: ml-platform-team
project: hybrid-feature-store
include:
- "*.py"
- "models/" # 假设有一个虚拟模型
python:
packages:
- scikit-learn
- pandas
- boto3
- pydantic
- duckdb
- numpy
models:
- tag: dummy_classifier:latest
path: "models/dummy_model.joblib"
现在是关键的 service.py
。注意 async def
的使用,这是实现 I/O 并行、降低延迟的关键。
# feature_fusion_service.py
import os
import asyncio
import logging
import bentoml
import numpy as np
import pandas as pd
import boto3
import duckdb
from pydantic import BaseModel
from typing import List
# --- 配置 ---
DYNAMODB_TABLE_NAME = "feature_store_hybrid"
DUCKDB_PATH = "offline_features.db" # 模拟数据仓库
BOTO_CONFIG = ... # 同步脚本中的配置
# 初始化客户端。在BentoML中,这些应在服务级别初始化以复用连接
dynamodb = boto3.resource('dynamodb', config=BOTO_CONFIG)
table = dynamodb.Table(DYNAMODB_TABLE_NAME)
duckdb_conn = duckdb.connect(database=DUCKDB_PATH, read_only=True)
# 模拟一个模型
from sklearn.linear_model import LogisticRegression
# 在实际项目中,这个模型会被 `bentoml models pull` 拉取
# 为了可复现性,我们在这里创建一个
if not os.path.exists("models/dummy_model.joblib"):
from sklearn.datasets import make_classification
from joblib import dump
X, y = make_classification(n_samples=100, n_features=5, n_informative=3, n_redundant=0, random_state=42)
model = LogisticRegression().fit(X, y)
os.makedirs("models", exist_ok=True)
dump(model, "models/dummy_model.joblib")
# --- 服务定义 ---
dummy_model_runner = bentoml.sklearn.get("dummy_classifier:latest").to_runner()
svc = bentoml.Service(
"hybrid_feature_service",
runners=[dummy_model_runner],
)
# 使用Pydantic进行输入验证
class UserRequest(BaseModel):
user_id: int
contextual_features: List[float] # 假设有些特征是请求时才有的
# --- 核心数据获取逻辑 ---
async def get_online_features(user_id: int) -> dict:
"""异步从DynamoDB获取实时特征"""
loop = asyncio.get_running_loop()
try:
# boto3 本身不是异步的,所以用 run_in_executor 包装它
# 这样就不会阻塞事件循环
response = await loop.run_in_executor(
None, # 使用默认的ThreadPoolExecutor
lambda: table.get_item(
Key={'PK': f"USER#{user_id}", 'SK': 'FEATURES#REALTIME'},
ConsistentRead=True # 强一致性读,确保拿到最新数据
)
)
return response.get('Item', {})
except Exception as e:
logging.error(f"DynamoDB query failed for user {user_id}: {e}")
return {}
async def get_offline_features(user_id: int) -> dict:
"""异步从DuckDB (模拟SQL数仓) 获取批量特征"""
loop = asyncio.get_running_loop()
try:
# DuckDB的Python客户端也不是原生的async,同样需要包装
query = f"SELECT avg_spend_7d, max_spend_7d FROM daily_features WHERE user_id = {user_id}"
result_df = await loop.run_in_executor(
None,
lambda: duckdb_conn.execute(query).fetchdf()
)
if not result_df.empty:
return result_df.to_dict('records')[0]
return {}
except Exception as e:
logging.error(f"DuckDB query failed for user {user_id}: {e}")
return {}
# --- API 端点 ---
@svc.api(input=bentoml.io.JSON(pydantic_model=UserRequest), output=bentoml.io.JSON())
async def predict(request: UserRequest):
user_id = request.user_id
logging.info(f"Received request for user_id: {user_id}")
# 使用 asyncio.gather 实现并行 I/O
# 这是性能优化的关键点,两个I/O操作会同时进行
online_features_task = get_online_features(user_id)
offline_features_task = get_offline_features(user_id)
results = await asyncio.gather(online_features_task, offline_features_task)
online_features = results[0]
offline_features = results[1]
# 特征融合
# 这里的顺序和选择逻辑非常重要,必须严格定义
# 在真实项目中,会从特征元数据中查询哪些是PRODUCTION状态的特征
feature_vector = []
feature_vector.extend(request.contextual_features)
# 假设模型需要以下特征,并按此顺序
feature_vector.append(online_features.get('session_duration_sec', 0.0))
feature_vector.append(offline_features.get('avg_spend_7d', 0.0))
# 确保特征向量是NumPy数组,并有正确的形状
feature_array = np.array(feature_vector).reshape(1, -1)
# 调用模型推理
prediction_result = await dummy_model_runner.predict.async_run(feature_array)
return {
"user_id": user_id,
"prediction": int(prediction_result[0]),
"features_used": {
"online": list(online_features.keys()),
"offline": list(offline_features.keys()),
"contextual": len(request.contextual_features)
}
}
这段代码展示了架构的核心价值:BentoML 不仅是一个模型服务器,更是一个数据编排和业务逻辑的微服务框架。通过 asyncio.gather
,我们最大化了 I/O 效率,这是满足低延迟需求的关键。
遗留问题与未来迭代
这个架构虽然解决了核心的混合特征供给问题,但它并非终点。
首先,目前的 Kanban 流程是手动的。一个完整的 MLOps 体系需要将 manage_feature.py
的调用与代码仓库的 Webhooks(例如,Jira ticket 状态变更、GitHub PR merge)深度集成,实现真正的流程自动化。
其次,在实时路径中直接查询 SQL 数据仓库存在性能风险。虽然对于某些准实时场景(如分钟级更新的特征)可行,但对于要求极致低延迟的场景,所有特征最终都应预计算并推送到 DynamoDB。这需要在特征新鲜度和架构复杂度之间做出权衡。
再者,我们没有解决特征的 Schema 管理和版本控制。当特征的计算逻辑或数据类型发生变化时,如何保证线上模型不受影响?这需要引入一个更正式的特征注册表(Feature Registry)机制,它会是整个系统的下一个演进方向。
最后,系统的可观测性需要加强。我们需要为特征的延迟、填充率、数据分布漂移等建立详细的监控和告警,确保我们不仅能提供特征,还能保证特征的质量。