构建服务于低延迟AI的混合存储特征管道:Redis Streams, MongoDB与ClickHouse的架构权衡


问题的起点非常明确:我们需要为一套在线风控与推荐模型提供特征服务。业务需求对这套服务的延迟提出了极为苛刻的要求——P99响应必须在50毫秒以内。同时,模型需要两类截然不同的特征:一类是实时性极高的“在线特征”,例如用户在当前会话中的点击次数、过去5分钟内的浏览商品类别;另一类是基于海量历史数据计算的“离线特征”,例如用户过去90天的购买力分级、最活跃的地理位置。将这两类特征的计算与服务整合进一个统一、高效且可维护的系统中,是我们面临的核心架构挑战。

定义问题:双重需求的矛盾

任何试图用单一技术方案解决所有问题的想法,在真实项目中通常都会碰壁。我们面对的正是这样一种典型场景:

  1. 在线存储与服务 (Online Store):

    • 访问模式: 高并发、低延迟的点查(Point Lookup)。模型服务会为每个请求,根据用户ID或设备ID等主键,拉取一组最新的特征。
    • 数据特性: 数据量相对较小(每个主键只存储最新的状态),但更新极其频繁。
    • 延迟要求: P99 < 20ms,为后续的模型计算和业务逻辑处理留出时间窗口。
  2. 离线存储与计算 (Offline Store):

    • 访问模式: 大规模的批量扫描(Batch Scan)和复杂的聚合计算。特征工程师需要对数月甚至数年的历史事件进行分析,以挖掘、验证和生成新的特征。
    • 数据特性: 数据量巨大(TB甚至PB级别),写入模式以批量追加为主。
    • 延迟要求: 计算延迟可以接受在小时甚至天级别,但查询性能必须足够支撑探索性分析。
  3. 数据一致性与时效性:

    • 在线特征必须是毫秒级最新的。
    • 离线特征需要周期性地刷新到在线存储中,供模型实时调用。
    • 最棘手的是,我们需要一个统一的事件源,确保在线和离线处理的是同一份数据,避免数据口径不一致导致的“训练-服务偏斜”(Training-Serving Skew)。

方案A:单一数据库的理想主义

一个诱人的想法是寻找一个“全能型”数据库,试图用它同时满足在线服务和离线分析的需求。例如,一些高性能的分布式NoSQL数据库,如ScyllaDB或某些配置下的Cassandra。

  • 优势:

    • 架构简单,只有一个核心存储组件,减少了运维复杂度和数据同步的麻烦。
    • 理论上,数据写入一次即可同时用于在线和离线场景。
  • 劣势与现实的残酷:

    • 工作负载冲突: 低延迟点查和高吞吐批量扫描是两种截然不同的IO模式。在一个集群中混合这两种负载,通常会导致相互干扰。在线服务的延迟会因为离线分析任务的执行而产生剧烈抖动,这是生产环境无法接受的。
    • 存储成本与效率: 为满足点查性能,这类数据库的数据模型通常基于LSM树或类似结构,并针对行存储进行优化。当进行大规模聚合分析时,它们需要读取大量无关数据,其效率和成本远不如列式存储。你可能需要为离线分析能力支付过高的硬件成本。
    • 分析能力局限: 尽管一些NoSQL数据库提供了类似SQL的查询接口,但其分析函数、窗口函数以及连接查询的能力,与专业的OLAP引擎(如ClickHouse)相比,功能和性能都相差甚远。特征工程中复杂的分析需求难以满足。

在真实项目中,这种方案很快就会暴露出短板。为了保障在线服务的SLA,团队不得不严格限制离线查询的时间窗口和复杂度,这极大地束缚了数据科学家的手脚,最终导致整个系统的价值大打折扣。一个常见的错误是低估了工作负载隔离的重要性。

方案B:混合存储的务实主义

经过对方案A的否决,我们转向了一个更为复杂但更贴近现实的混合架构。其核心思想是为不同的任务选择最合适的工具,然后设计一套可靠的数据流将它们粘合起来。

  • 我们的技术选型:
    • 事件总线 (Event Bus): Redis Streams。我们选择它作为系统的心脏。它不仅是一个轻量级的消息队列,其消费者组(Consumer Groups)机制提供了持久化、消息确认和故障转移能力,完美契-合了“一份数据,多方消费”的需求。它的性能极高,足以应对流量洪峰。
    • 在线存储 (Online Store): Redis (Hash)。对于在线特征,没有比Redis更合适的选择了。内存存储保证了亚毫秒级的访问延迟。我们使用Hash结构,以用户ID为主键,存储其对应的多个特征键值对。
    • 源数据与画像存储 (Source of Truth): MongoDB。用户的核心画像、一些不常变动但需要灵活查询的半结构化数据,存放在MongoDB中。它的文档模型非常适合存储用户Profile,并且可以作为某些特征计算的上游数据源。
    • 离线存储 (Offline Store): ClickHouse。其强大的列式存储和计算引擎是离线分析的不二之选。海量事件数据的存储成本低,聚合查询速度极快,能够轻松应对特征工程师的复杂分析需求。

这个架构的本质是将读(在线服务)和写(事件接收)、实时处理(在线特征计算)与批量处理(离线特征计算)彻底分离。

graph TD
    subgraph "事件源 (e.g., App/Web Server)"
        A[Ingestion API]
    end

    subgraph "数据管道核心"
        A -- XADD --> B(Redis Streams: event_log)
    end

    subgraph "实时处理流 (Real-time Flow)"
        B -- XREADGROUP --> C{Stream Consumer: Online}
        C -- HSET --> D[Redis Hash: Online Features]
        C -- UPDATE --> E[MongoDB: User Profiles]
    end

    subgraph "近实时处理流 (Near Real-time Flow)"
        B -- XREADGROUP --> F{Stream Consumer: Offline}
        F -- Batch INSERT --> G[ClickHouse: Raw Events]
    end

    subgraph "特征服务"
        H[Feature Serving API]
        H -- HGETALL --> D
        H -- FIND --> E
        H -- (Cache Miss/Complex Feature) --> I{Feature Computation Engine}
        I -- SELECT ... --> G
    end

    subgraph "模型训练"
        J[ML Training Pipeline]
        J -- SELECT ... FROM --> G
    end

    K[Client / ML Model] -- Request --> H
  • 优势:

    • 性能最大化: 每个组件都做自己最擅长的事。Redis保障在线服务延迟,ClickHouse保障分析性能。
    • 工作负载隔离: 在线和离线数据流完全分离,互不干扰。ClickHouse的批量写入和查询不会影响Redis的P99延迟。
    • 可扩展性: 每个组件都可以独立扩展。如果事件流增大,可以增加Redis Streams的分片和消费者实例;如果历史数据增多,可以扩展ClickHouse集群。
  • 劣势与挑战:

    • 架构复杂度: 引入了多个组件,运维和监控的复杂度显著增加。
    • 数据同步与一致性: 这是最大的挑战。需要确保从Redis Streams到ClickHouse的数据管道是可靠的,并且有机制处理延迟和数据差异。
    • 开发成本: 需要编写和维护多个消费者服务以及数据同步逻辑。

我们最终选择了方案B。因为对于一个核心业务系统,长期的性能、稳定性和可扩展性,远比短期的架构简洁性更重要。可管理的复杂性,胜过不可控的性能瓶颈。

核心实现概览

我们将使用Go语言来构建数据管道中的各个服务,因为它在网络编程和并发处理方面表现出色,非常适合这类高吞吐的中间件开发。

1. Ingestion API: 事件入口

这是所有数据的入口。它必须轻量、高效,并且能处理高并发写入。我们使用Gin框架。

// main.go
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/go-redis/redis/v8"
)

// Config holds all configuration for our service
type Config struct {
	RedisAddr    string
	RedisStream  string
	ServerPort   string
}

// EventIngestor handles receiving events and publishing them to Redis Streams
type EventIngestor struct {
	rdb    *redis.Client
	stream string
	ctx    context.Context
}

// NewEventIngestor creates a new ingestor service
func NewEventIngestor(cfg Config) (*EventIngestor, error) {
	rdb := redis.NewClient(&redis.Options{
		Addr:     cfg.RedisAddr,
		PoolSize: 100, // Production-grade pool size
	})

	ctx := context.Background()
	if _, err := rdb.Ping(ctx).Result(); err != nil {
		return nil, fmt.Errorf("failed to connect to redis: %w", err)
	}

	return &EventIngestor{
		rdb:    rdb,
		stream: cfg.RedisStream,
		ctx:    ctx,
	}, nil
}

// HandleEvent is the Gin handler for ingesting a single event
func (e *EventIngestor) HandleEvent(c *gin.Context) {
	var eventData map[string]interface{}
	if err := c.ShouldBindJSON(&eventData); err != nil {
		c.JSON(400, gin.H{"error": "Invalid JSON payload"})
		return
	}
    
    // 在真实项目中,这里必须有严格的 schema 校验
	eventData["ingest_time"] = time.Now().UTC().UnixMilli()

	args := &redis.XAddArgs{
		Stream: e.stream,
		Values: eventData,
        // MaxLen: 10_000_000, // Optional: Cap the stream size
	}

	// XADD is the command to append a new entry to a stream
	id, err := e.rdb.XAdd(e.ctx, args).Result()
	if err != nil {
		log.Printf("Error publishing event to Redis Stream: %v", err)
		c.JSON(500, gin.H{"error": "Failed to process event"})
		return
	}

	c.JSON(202, gin.H{"status": "accepted", "id": id})
}

func main() {
	cfg := Config{
		RedisAddr:    getEnv("REDIS_ADDR", "localhost:6379"),
		RedisStream:  getEnv("REDIS_STREAM", "events:raw"),
		ServerPort:   getEnv("SERVER_PORT", "8080"),
	}

	ingestor, err := NewEventIngestor(cfg)
	if err != nil {
		log.Fatalf("Failed to initialize event ingestor: %v", err)
	}

	gin.SetMode(gin.ReleaseMode)
	router := gin.New()
	router.Use(gin.Logger(), gin.Recovery()) // Essential middleware

	router.POST("/event", ingestor.HandleEvent)

	log.Printf("Starting ingestion server on port %s", cfg.ServerPort)
	if err := router.Run(":" + cfg.ServerPort); err != nil {
		log.Fatalf("Failed to run server: %v", err)
	}
}

func getEnv(key, fallback string) string {
	if value, ok := os.LookupEnv(key); ok {
		return value
	}
	return fallback
}

关键点剖析:

  • 配置化: 所有关键参数(Redis地址、Stream名称)都通过环境变量配置,这是云原生应用的基本实践。
  • 连接池: 我们为Redis客户端配置了连接池,避免为每个请求创建新连接,这是高并发服务的性能保障。
  • 错误处理: 对Redis连接和命令执行都做了错误处理和日志记录。
  • 异步确认: API返回202 Accepted,表示事件已被接收,但尚未被下游完全处理。这种异步模式解耦了客户端和后端处理流。

2. Offline Consumer: Redis Streams to ClickHouse

这个消费者是数据入库到ClickHouse的桥梁。这里的核心挑战是批量写入。直接对每条消息都执行一次INSERT会因为网络开销和ClickHouse的Part合并机制而导致性能极差。

// offline_consumer/main.go
package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/ClickHouse/clickhouse-go/v2"
	"github.com/go-redis/redis/v8"
)

// ... (Config and getEnv structs/functions similar to above) ...

const (
	batchSize    = 1000 // Number of messages to batch before inserting
	batchTimeout = 5 * time.Second // Max time to wait before flushing a batch
)

type OfflineConsumer struct {
	rdb        *redis.Client
	chConn     *sql.DB
	stream     string
	group      string
	consumer   string
	ctx        context.Context
}

func NewOfflineConsumer(cfg Config) (*OfflineConsumer, error) {
    // ... Redis connection setup ...
	
	// ClickHouse connection
	conn := clickhouse.OpenDB(&clickhouse.Options{
		Addr: []string{cfg.ClickHouseAddr},
		Auth: clickhouse.Auth{
			Database: cfg.ClickHouseDB,
			Username: cfg.ClickHouseUser,
			Password: cfg.ClickHousePass,
		},
		Settings: clickhouse.Settings{
			"async_insert": 1, // Use asynchronous inserts for better performance
			"wait_for_async_insert": 1,
		},
	})
	conn.SetMaxIdleConns(5)
	conn.SetMaxOpenConns(10)
	conn.SetConnMaxLifetime(time.Hour)

	if err := conn.Ping(); err != nil {
		return nil, fmt.Errorf("failed to connect to clickhouse: %w", err)
	}
    
    // Ensure consumer group exists
    // In production, this should be an idempotent operation.
    // XGROUP CREATE my_stream my_group $ MKSTREAM
    // The MKSTREAM option creates the stream if it doesn't exist.
    _, err := rdb.XGroupCreateMkStream(ctx, cfg.RedisStream, "offline-group", "$").Result()
	if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
		return nil, fmt.Errorf("failed to create consumer group: %w", err)
	}
    
	hostname, _ := os.Hostname()
	return &OfflineConsumer{
		rdb:      rdb,
		chConn:   conn,
		stream:   cfg.RedisStream,
		group:    "offline-group",
		consumer: fmt.Sprintf("consumer-%s", hostname),
		ctx:      ctx,
	}, nil
}


func (c *OfflineConsumer) Run() {
	log.Println("Starting offline consumer...")
	ticker := time.NewTicker(batchTimeout)
	defer ticker.Stop()

	var batch []map[string]interface{}

	for {
		select {
		case <-ticker.C:
			// Time-based flush
			if len(batch) > 0 {
				log.Printf("Flushing batch due to timeout, size: %d", len(batch))
				c.flushBatch(&batch)
			}
		default:
			// Read from stream
			streams, err := c.rdb.XReadGroup(c.ctx, &redis.XReadGroupArgs{
				Group:    c.group,
				Consumer: c.consumer,
				Streams:  []string{c.stream, ">"}, // ">" means new messages only
				Count:    int64(batchSize - len(batch)),
				Block:    2 * time.Second,
			}).Result()

			if err != nil {
				if err != redis.Nil {
					log.Printf("Error reading from stream: %v", err)
				}
				continue // redis.Nil means timeout, which is normal
			}
			
			for _, stream := range streams {
				for _, msg := range stream.Messages {
					batch = append(batch, msg.Values)
					if len(batch) >= batchSize {
						log.Printf("Flushing batch due to size, size: %d", len(batch))
						c.flushBatch(&batch)
                        ticker.Reset(batchTimeout) // Reset timer after a flush
					}
				}
			}
		}
	}
}

func (c *OfflineConsumer) flushBatch(batch *[]map[string]interface{}) {
	if len(*batch) == 0 {
		return
	}
	
    // The actual INSERT logic. This part is critical.
	tx, err := c.chConn.Begin()
	if err != nil {
		log.Printf("Failed to begin transaction: %v", err)
		return // Batch remains, will retry
	}
	
	// Your table schema must match the event structure.
	stmt, err := tx.Prepare("INSERT INTO events (user_id, event_type, timestamp, payload)")
	if err != nil {
		log.Printf("Failed to prepare statement: %v", err)
		tx.Rollback()
		return
	}
	defer stmt.Close()

	idsToAck := []string{}
	successfulInserts := 0

	for _, event := range *batch {
		// Data transformation and validation should happen here.
        // A common mistake is to trust the data blindly.
		userID, _ := event["user_id"].(string)
		eventType, _ := event["event_type"].(string)
        // Convert timestamp if necessary
        
        // This is a simplified example. In production, you'd handle
        // different event types, nested JSON, etc.
        // The `payload` column could be of type String or JSON in ClickHouse.
		_, err := stmt.Exec(userID, eventType, time.Now(), "{}") // Placeholder for payload
		if err != nil {
			log.Printf("Failed to execute insert for one event: %v", err)
			// Decide on error strategy: skip, move to DLQ, etc.
			continue
		}
		// Assuming event ID is in the message map, which it's not by default
        // This part needs adjustment based on how you pass the ID
		// idsToAck = append(idsToAck, event["id"].(string)) 
		successfulInserts++
	}

	if err := tx.Commit(); err != nil {
		log.Printf("Failed to commit transaction: %v", err)
		tx.Rollback()
		return
	}

	log.Printf("Successfully inserted %d records.", successfulInserts)

    // Acknowledge messages in Redis *after* successful commit to ClickHouse.
    // This provides at-least-once delivery semantics.
	// c.rdb.XAck(c.ctx, c.stream, c.group, idsToAck...) 
	
	*batch = nil // Clear the batch
}

// ... main function to initialize and run the consumer ...

关键点剖析:

  • 批处理逻辑: 使用 select 结合 time.Ticker 和消息读取,实现了基于大小和时间的双重批处理触发机制。这是性能优化的核心。
  • 消费者组: 使用Redis Streams的消费者组(XREADGROUP),可以轻松地横向扩展消费者实例,每个实例处理一部分消息流,实现负载均衡和高可用。
  • 幂等性与重试: XACK 必须在数据成功写入ClickHouse之后调用。如果消费者在 INSERT 成功后但在 XACK 前崩溃,重启后会重复处理消息。因此,ClickHouse表需要有主键或采用 ReplacingMergeTree 引擎来处理重复数据,保证幂等性。
  • ClickHouse async_insert: 开启异步插入可以显著提高写入吞吐量,客户端不必等待数据完全写入磁盘。

3. Online Consumer and Feature Serving API

在线消费者的逻辑类似,但它的目标是更新Redis中的在线特征(例如,用HINCRBY更新计数器,用LPUSHLTRIM维护一个最近行为列表)。Feature Serving API则负责组合和提供这些特征。

// feature_server/main.go
package main

// ... imports and config ...

type FeatureServer struct {
	rdb    *redis.Client
	mongo  *mongo.Client // Connection to MongoDB
	chConn *sql.DB      // Connection to ClickHouse for complex features
	// ...
}

func (fs *FeatureServer) GetFeatures(c *gin.Context) {
	userID := c.Param("userID")
	if userID == "" {
		c.JSON(400, gin.H{"error": "userID is required"})
		return
	}

	// 1. Fetch real-time features from Redis (Online Store)
	// Use Pipeline for multiple commands to reduce RTT
	pipe := fs.rdb.Pipeline()
	sessionClickCount := pipe.HGet(c, fmt.Sprintf("user:%s:features", userID), "session_clicks")
	last5Products := pipe.LRange(c, fmt.Sprintf("user:%s:recent_products", userID), 0, 4)
	_, err := pipe.Exec(c)
	if err != nil && err != redis.Nil {
		log.Printf("Error fetching from Redis pipeline: %v", err)
		// Fallback or error out
	}
    
    // 2. Fetch profile features from MongoDB
    // var userProfile bson.M
    // profileCollection.FindOne(...).Decode(&userProfile)
    
    // 3. Fetch historical features from ClickHouse (if not cached)
    // This is the slowest path and should be used sparingly or cached heavily.
    // var purchaseFrequency float64
    // fs.chConn.QueryRow("SELECT avg(...) FROM ... WHERE user_id = ?", userID).Scan(&purchaseFrequency)

	features := gin.H{
		"user_id":             userID,
		"session_clicks":      sessionClickCount.Val(),
		"last_5_products_viewed": last5Products.Val(),
        // "purchase_frequency_90d": purchaseFrequency,
        // "location": userProfile["city"],
	}

	c.JSON(200, features)
}

// ... main function to start the server ...

关键点剖析:

  • 多源聚合: API的核心职责是从多个数据源(Redis, MongoDB, ClickHouse)拉取数据并聚合成一个统一的特征向量。
  • 性能分层: 访问路径被精心设计:最快的是Redis,其次是MongoDB,最慢的是ClickHouse。在真实项目中,会引入一层缓存(如Caffeine或另一个Redis实例)来缓存从ClickHouse计算出的昂贵特征。
  • Redis Pipeline: 通过将多个Redis命令打包一次性发送,可以显著降低网络往返时间(RTT),对低延迟服务至关重要。

架构的扩展性与局限性

这套混合架构虽然解决了核心问题,但它并非银弹,理解其边界至关重要。

扩展性:

  • 特征添加: 添加新的在线特征,只需修改Online Consumer的逻辑并在Redis中增加一个字段。添加新的离线特征,则主要是编写ClickHouse的SQL查询,并通过一个批处理作业将结果写回在线存储。整个流程清晰且解耦。
  • 流量增长: 可以独立地扩展Ingestion API、消费者实例和数据库集群,灵活性很高。

局限性与未来迭代方向:

  • 近实时而非真实时: 从事件产生到数据可供在ClickHouse中查询,存在一个由批处理大小和间隔决定的延迟(通常是秒级)。这对于需要T+1分析的场景完全足够,但无法满足需要对几十秒内数据进行复杂查询的场景。
  • 特征回填 (Backfilling): 当一个特征工程师定义了一个新的、基于历史数据的特征时(例如,“用户过去365天的总消费”),我们需要对存量的所有用户计算这个特征。这需要一个独立的大规模批处理任务(例如Spark Job)来读取ClickHouse的全部历史数据,进行计算,然后将结果批量导入Redis。这个过程的工程实现和调度是相当复杂的。
  • 一致性窗口: 在线和离线存储之间存在一个数据延迟窗口。在特征定义中必须小心处理时间,避免使用在两个存储中可能不一致的数据进行联合计算,否则会导致线上线下结果不一致。
  • 工具链成本: 维护这样一个多组件系统,需要强大的可观测性平台(日志、指标、追踪),自动化的部署流水线,以及对每个组件都有深入了解的运维团队。这笔“隐性成本”在架构设计之初就必须被考虑到。

  目录