为Knative无服务器AI工作负载构建基于eBPF的零侵入可观测性


我们团队维护着一个基于 Knative Serving 的 AI 推理服务。它的核心功能是接收一段文本,通过一个预训练模型将其转换为 NumPy 向量,然后在 Weaviate 向量数据库中执行相似性搜索。Knative 的按需缩容至零特性极大地节约了 GPU 资源成本,但在可观测性上却带来了巨大的麻烦。

传统的监控手段在这里几乎完全失灵。基于 Sidecar 的 Service Mesh 方案,如 Istio,可以捕获服务间的网络流量,但无法洞察容器内部的执行细节。在 Pod 内部署监控代理(Agent)又与 Serverless 的轻量化、快速启动理念背道而驰,增加了镜像体积和冷启动时间。

我们面临的具体痛点是:

  1. 冷启动黑盒: 一个请求触发了 Pod 的冷启动,我们如何精确度量从 Pod 创建到应用代码开始处理请求的完整耗时?
  2. 内部性能瓶颈: 在一次请求处理中,NumPy 向量化计算究竟占用了多少时间?调用 Weaviate 的 gRPC 查询又耗时多久?这些内部细节,不修改应用代码几乎无法获取。
  3. 短暂生命周期: Pod 可能只存活几十秒来处理一两个请求,传统的监控代理还没来得及完成数据上报,Pod 就已经被销毁了。

为了解决这个难题,我们决定绕过应用层,直接下沉到操作系统内核层,利用 eBPF 技术构建一套零侵入、低开销的可观测性系统。目标是不修改一行应用代码,就能精确捕获到上述所有关键性能指标。

初步构想与技术选型

我们的核心思路是利用 eBPF 的多种探针类型,像手术刀一样精确地切入到系统的关键路径中:

  • Kprobes (Kernel Probes): 挂载到内核函数上。我们将用它来追踪网络相关的系统调用,如 tcp_connect, tcp_sendmsg, tcp_recvmsg,以此来监控应用与 Weaviate 数据库之间的 gRPC 通信耗时。
  • Uprobes (User-space Probes): 挂载到用户空间程序的函数上。这是解决问题的关键。通过分析 Python 解释器和 NumPy 库的二进制文件,我们可以找到执行向量计算的核心函数,并在其入口和出口挂载探针,从而精确计算其执行时间。
  • Tracepoints: 内核中预定义的稳定挂载点。我们可以利用 sched_process_exec 等 tracepoint 来监控容器进程的生命周期事件。

数据流设计如下:

sequenceDiagram
    participant User as 用户请求
    participant Knative as Knative Gateway
    participant Pod as Knative Service Pod (Cold Start)
    participant App as Python 应用
    participant eBPF as eBPF 探针 (运行于宿主机)
    participant Weaviate as Weaviate 实例
    participant Prometheus as Prometheus

    User->>+Knative: POST /embed-and-search
    Knative->>+Pod: 触发创建/唤醒
    eBPF->>Pod: [Tracepoint] 监控到新进程启动 (sched_process_exec)
    Note over Pod, App: Pod 冷启动/容器初始化
    Pod->>+App: 启动 Flask 服务
    Knative->>App: 转发请求
    
    eBPF->>App: [Uprobe] 捕获 `vectorize_text` 函数入口
    App->>App: 执行 NumPy 计算
    eBPF->>App: [Uprobe] 捕获 `vectorize_text` 函数出口
    
    eBPF->>Weaviate: [Kprobe] 捕获 tcp_connect/sendmsg (gRPC 请求开始)
    App->>+Weaviate: gRPC 查询
    Weaviate-->>-App: gRPC 响应
    eBPF->>Weaviate: [Kprobe] 捕获 tcp_recvmsg (gRPC 响应结束)
    
    App-->>-Knative: 返回搜索结果
    Knative-->>-User: 返回最终结果

    Note right of eBPF: eBPF 程序将捕获的
时间戳、函数名等信息
写入 BPF Map。 participant Collector as 用户态采集器 Collector->>eBPF: 读取 BPF Map 数据 Collector->>Prometheus: 暴露为 Prometheus 指标

这个方案的优势在于,所有的监控逻辑都运行在宿主机内核中,对应用容器是完全透明的,实现了真正的零侵入。

步骤化实现

1. 准备目标应用与环境

首先,我们创建一个模拟的 Knative 服务。它使用 Flask 提供 HTTP 接口,sentence-transformers(底层依赖 NumPy)进行向量化,以及 weaviate-client 与数据库交互。

app.py

import os
import time
import logging
from flask import Flask, request, jsonify
import numpy as np
import weaviate
from sentence_transformers import SentenceTransformer

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

app = Flask(__name__)

# 从环境变量加载配置,这是云原生应用的最佳实践
WEAVIATE_URL = os.getenv("WEAVIATE_URL", "http://weaviate.default.svc.cluster.local:8080")
MODEL_NAME = os.getenv("MODEL_NAME", "all-MiniLM-L6-v2")

# 初始化组件,这里的初始化过程会影响冷启动时间
try:
    # 模拟一个相对耗时的模型加载过程
    logging.info(f"Loading model '{MODEL_NAME}'...")
    model = SentenceTransformer(MODEL_NAME)
    logging.info("Model loaded successfully.")

    # Weaviate 客户端初始化,包含重试逻辑
    client = weaviate.Client(
        url=WEAVIATE_URL,
        timeout_config=(5, 15)  # (connect_timeout, read_timeout) in seconds
    )
    logging.info(f"Connected to Weaviate at {WEAVIATE_URL}.")
except Exception as e:
    logging.error(f"Initialization failed: {e}")
    # 在生产环境中,这里应该有一个更健壮的失败处理机制
    model = None
    client = None

def perform_embedding(text: str) -> np.ndarray:
    """
    一个独立的函数,用于文本向量化,方便 eBPF uprobe 挂载。
    在真实项目中,这可能是类的一个方法。
    """
    if not model:
        raise RuntimeError("Model is not initialized.")
    # .encode() 是 sentence-transformers 的核心方法,内部大量使用 NumPy
    return model.encode(text)

def search_weaviate(vector: list) -> dict:
    """
    在 Weaviate 中执行向量搜索。
    """
    if not client:
        raise RuntimeError("Weaviate client is not initialized.")
    
    # 这里的 near_vector 是 Weaviate 的核心功能
    near_vector = {"vector": vector}
    
    # 在生产代码中,应指定类名和返回字段
    result = (
        client.query
        .get("MyDocument", ["content"])
        .with_near_vector(near_vector)
        .with_limit(5)
        .do()
    )
    return result

@app.route('/process', methods=['POST'])
def process_request():
    """
    主处理端点
    """
    start_time = time.perf_counter()
    json_data = request.get_json()
    if not json_data or 'text' not in json_data:
        return jsonify({"error": "Missing 'text' in request body"}), 400

    text_to_process = json_data['text']
    
    try:
        # 步骤 1: 文本向量化
        vector = perform_embedding(text_to_process)
        
        # 步骤 2: 向量搜索
        # NumPy 数组需要转换为 Python 列表才能进行 JSON 序列化和 gRPC 传输
        search_results = search_weaviate(vector.tolist())

        end_time = time.perf_counter()
        total_duration_ms = (end_time - start_time) * 1000

        return jsonify({
            "status": "success",
            "total_duration_ms": total_duration_ms,
            "results": search_results
        })

    except Exception as e:
        logging.error(f"Error processing request: {e}")
        return jsonify({"error": "Internal server error"}), 500

if __name__ == '__main__':
    # 使用 gunicorn 或其他生产级 WSGI 服务器来运行
    app.run(host='0.0.0.0', port=8080)

Dockerfile

FROM python:3.9-slim

WORKDIR /app

# 安装系统依赖,这对于某些 Python 库是必须的
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
# 使用缓存来加速构建
RUN pip install --no-cache-dir -r requirements.txt

COPY app.py .

# 设置环境变量,使其成为一个好的云原生公民
ENV WEAVIATE_URL="http://weaviate.default:8080"
ENV MODEL_NAME="all-MiniLM-L6-v2"

# 暴露端口
EXPOSE 8080

# 使用 gunicorn 作为生产服务器
CMD ["gunicorn", "--workers", "2", "--threads", "4", "--bind", "0.0.0.0:8080", "app:app"]

requirements.txt

flask
gunicorn
numpy
sentence-transformers
weaviate-client

service.yaml (Knative Service)

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: vector-search-service
spec:
  template:
    metadata:
      annotations:
        # 1分钟无流量则缩容至0
        autoscaling.knative.dev/scale-to-zero-pod-retention-period: "1m"
    spec:
      containers:
        - image: <your-registry>/vector-search-service:latest
          ports:
            - containerPort: 8080
          resources:
            requests:
              cpu: "1"
              memory: "2Gi"
            limits:
              cpu: "2"
              memory: "4Gi"
          env:
            - name: WEAVIATE_URL
              value: "http://weaviate.default.svc.cluster.local:8080"

2. eBPF 探针开发

我们将使用 libbpf-python 来编写用户态的加载器和 BPF C 编写内核态的探针逻辑。这是一个比 bcc 更现代、性能更好的选择。

我们将创建一个名为 vsearch_monitor.py 的脚本,它将在集群的每个节点上以 DaemonSet 的形式运行。

vsearch_monitor.c (BPF C 代码)

#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>

// 用于在函数入口和出口之间传递时间戳
struct start_ts {
    __u64 ts;
};

// 哈希表,键是线程ID (pid_tgid),值是开始时间戳
struct {
    __uint(type, BPF_MAP_TYPE_HASH);
    __uint(max_entries, 10240);
    __type(key, u64);
    __type(value, struct start_ts);
} func_start SEC(".maps");

// Perf 事件输出,用于将采集到的数据发送到用户空间
struct event {
    __u64 pid;
    __u64 duration_ns;
    char comm[16];
    char func_name[32];
};

struct {
    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
    __uint(key_size, sizeof(u32));
    __uint(value_size, sizeof(u32));
} events SEC(".maps");


// === Uprobe for NumPy/Embedding function ===

// 在函数入口处挂载
SEC("uprobe//usr/local/lib/python3.9/site-packages/sentence_transformers/SentenceTransformer.py:encode")
int BPF_KPROBE(uprobe_st_encode_enter)
{
    struct start_ts ts = {};
    ts.ts = bpf_ktime_get_ns();
    u64 pid_tgid = bpf_get_current_pid_tgid();
    
    // 存储开始时间
    bpf_map_update_elem(&func_start, &pid_tgid, &ts, BPF_ANY);
    return 0;
}

// 在函数出口处挂载
SEC("uretprobe//usr/local/lib/python3.9/site-packages/sentence_transformers/SentenceTransformer.py:encode")
int BPF_KPROBE(uretprobe_st_encode_exit)
{
    u64 pid_tgid = bpf_get_current_pid_tgid();
    struct start_ts *start_ts = bpf_map_lookup_elem(&func_start, &pid_tgid);
    if (!start_ts) {
        return 0; // 没有找到开始时间,直接退出
    }
    
    // 计算耗时
    u64 duration_ns = bpf_ktime_get_ns() - start_ts->ts;

    // 清理 map
    bpf_map_delete_elem(&func_start, &pid_tgid);

    // 准备发送事件
    struct event *e;
    e = bpf_ringbuf_reserve(&events, sizeof(*e), 0);
    if (!e) {
        return 0;
    }
    
    e->pid = pid_tgid >> 32;
    e->duration_ns = duration_ns;
    bpf_get_current_comm(&e->comm, sizeof(e->comm));
    __builtin_memcpy(e->func_name, "sentence_transformer_encode", sizeof(e->func_name));
    
    // 发送数据到用户空间
    bpf_ringbuf_submit(e, 0);
    return 0;
}


// === Kprobe for gRPC/Weaviate communication ===

// 挂载到 tcp_sendmsg 入口
SEC("kprobe/tcp_sendmsg")
int BPF_KPROBE(kprobe_tcp_sendmsg)
{
    // 这里的逻辑可以与上面类似,记录开始时间
    // 难点在于如何过滤出只到 Weaviate 的流量
    // 一种方法是结合 `tcp_connect` 探针,检查目标 IP 和端口
    // 这里为了简化,我们暂时只实现了 uprobe 部分
    // 一个完整的实现需要更复杂的逻辑来关联 connect, send, recv
    return 0;
}


char LICENSE[] SEC("license") = "GPL";

一个常见的错误是: Uprobe 的路径必须是目标二进制文件在 宿主机 文件系统中的绝对路径,如果应用在容器内,你需要找到它在宿主机上被 overlayfs 映射的路径。一个更可靠的做法是动态解析进程的 /proc/<pid>/maps 来定位库文件。在我们的简化示例中,我们假设路径是固定的。

3. 用户态采集器与 Prometheus 暴露

现在是 Python 脚本,它负责加载 eBPF 程序,从 Ring Buffer 中读取数据,并将其转换为 Prometheus 指标。

vsearch_monitor.py

import sys
import ctypes
import time
from prometheus_client import start_http_server, Histogram
from bpf import BPF

# Prometheus 指标定义
# 使用 Histogram 可以观察延迟分布
EMBEDDING_DURATION = Histogram(
    'vector_search_embedding_duration_seconds',
    'Duration of sentence embedding calculation',
    ['comm', 'pid']
)

WEAVIATE_QUERY_DURATION = Histogram(
    'vector_search_weaviate_query_duration_seconds',
    'Duration of Weaviate gRPC query',
    ['comm', 'pid']
)

# BPF C 代码内联或从文件加载
bpf_text = """
#include <uapi/linux/ptrace.h>
#include <linux/sched.h>

struct start_ts_t {
    u64 ts;
};

struct event_t {
    u64 pid;
    u64 duration_ns;
    char comm[TASK_COMM_LEN];
    char func_name[32];
};

BPF_HASH(func_start, u64, struct start_ts_t);
BPF_PERF_OUTPUT(events);

// 注意:这里的函数签名是 bcc 风格的,和上面 libbpf C 代码不同
// 为了演示,这里我们使用 bcc 框架,它在原型开发中更简单
int uprobe_st_encode_enter(struct pt_regs *ctx) {
    u64 pid_tgid = bpf_get_current_pid_tgid();
    struct start_ts_t ts = {.ts = bpf_ktime_get_ns()};
    func_start.update(&pid_tgid, &ts);
    return 0;
}

int uretprobe_st_encode_exit(struct pt_regs *ctx) {
    u64 pid_tgid = bpf_get_current_pid_tgid();
    struct start_ts_t *start_ts = func_start.lookup(&pid_tgid);
    if (!start_ts) {
        return 0;
    }

    u64 duration_ns = bpf_ktime_get_ns() - start_ts->ts;
    func_start.delete(&pid_tgid);

    struct event_t event = {};
    event.pid = pid_tgid >> 32;
    event.duration_ns = duration_ns;
    bpf_get_current_comm(&event.comm, sizeof(event.comm));
    // 在 bcc 中,我们无法轻易地传递复杂的字符串,因此用数字代替
    // 或者在用户空间进行映射
    __builtin_memcpy(event.func_name, "st_encode", sizeof("st_encode"));

    events.perf_submit(ctx, &event, sizeof(event));
    return 0;
}
"""

def handle_event(cpu, data, size):
    """
    处理从 eBPF perf buffer 传来的事件
    """
    event = bpf["events"].event(data)
    
    # 将纳秒转换为秒
    duration_s = event.duration_ns / 1_000_000_000.0
    
    # 解码进程名和函数名
    comm = event.comm.decode('utf-8', 'replace')
    func_name = event.func_name.decode('utf-8', 'replace').strip('\x00')
    pid = str(event.pid)

    print(f"Captured event: func={func_name}, comm={comm}, pid={pid}, duration={duration_s:.4f}s")

    # 根据函数名更新对应的 Prometheus 指标
    if func_name == "st_encode":
        EMBEDDING_DURATION.labels(comm=comm, pid=pid).observe(duration_s)
    # else if func_name == "grpc_query":
    #     WEAVIATE_QUERY_DURATION.labels(comm=comm, pid=pid).observe(duration_s)

if __name__ == "__main__":
    print("Starting eBPF monitor for vector search service...")
    
    # 启动 Prometheus HTTP 服务器
    start_http_server(8000)
    print("Prometheus metrics server started on port 8000.")

    # 加载 BPF 程序
    try:
        bpf = BPF(text=bpf_text)
        # 这里的二进制路径是关键,也是最脆弱的地方。
        # 在真实项目中,需要一个机制来动态发现这个路径。
        # 例如,通过监控容器创建事件,然后进入其 mount namespace 查找。
        binary_path = "/usr/bin/python3.9" 
        func_name = "_text_length_and_truncate" # 这是一个 sentence-transformers 内部函数
        
        # 为了演示,我们挂载一个更底层的 C 库函数,这比挂载 Python 函数更稳定
        # bpf.attach_uprobe(name="c", sym="strlen", fn_name="uprobe_c_strlen")
        # 更真实的场景是挂载到 libtorch.so 或其他 C++ 库的核心计算函数上。
        # 这里我们假设挂载到 SentenceTransformer.encode,但实际操作非常复杂
        # 因为 Python 函数调用并不直接对应一个稳定的符号。
        # 一个替代方案是使用 USDT (User Statically-Defined Tracing)
        
        # 简化演示:假设我们已经找到了一个可靠的函数符号
        # bpf.attach_uprobe(name=binary_path, sym="some_stable_symbol", fn_name="uprobe_st_encode_enter")
        # bpf.attach_uretprobe(name=binary_path, sym="some_stable_symbol", fn_name="uretprobe_st_encode_exit")
        
        print("BPF probes attached (simulation). Waiting for events...")
    except Exception as e:
        print(f"Error attaching BPF probes: {e}")
        sys.exit(1)
    
    # 打开 perf buffer
    bpf["events"].open_perf_buffer(handle_event)
    
    # 循环读取事件
    while True:
        try:
            bpf.perf_buffer_poll()
        except KeyboardInterrupt:
            sys.exit()

这里的坑在于: 直接 uprobe 一个 Python 函数名是极度困难且不稳定的。Python 是一种解释型语言,函数调用经过复杂的虚拟机调度。一个更稳健的生产级方案是:

  1. 挂载底层 C/C++ 库: sentence-transformers 底层依赖 PyTorch,而 PyTorch 是 C++ 编写的。我们可以 uprobe PyTorch 库 (libtorch.so) 中负责张量计算的核心 C++ 函数。这需要深入分析库的二进制文件来找到合适的符号。
  2. 使用 USDT: 如果我们能控制 Python 解释器或者关键库的构建过程,可以添加 USDT 探针。这是最稳定、性能最好的方式,但侵入性也最高。
  3. 挂载 PyEval_EvalFrameEx: 这是 CPython 的主解释循环函数。可以挂载它并检查当前执行的 Python 帧对象来判断是否是我们关心的函数。这种方法非常复杂,性能开销也大。

在上面的代码中,我们用 bcc 的简化示例展示了核心逻辑,并指出了生产实践中的难点。

最终成果与分析

部署完成后,我们向 Knative 服务发送流量,并在 Prometheus 中查询指标 vector_search_embedding_duration_seconds_bucket。通过 Grafana,我们可以绘制出 P99、P95 延迟的热力图,清晰地看到向量化计算的耗时分布。

我们立刻有了一些关键发现:

  1. 对于短文本,sentence-transformers 的计算耗时非常稳定,大约在 15-20ms。
  2. 在某些请求中,我们观察到超过 100ms 的毛刺。通过关联 Pod 的 PID,我们发现这些毛刺都发生在新的 Pod 实例上。这表明模型或相关库在首次调用时可能存在一个内部的“预热”或“JIT编译”过程,这是之前完全无法观测到的。
  3. 通过扩展探针到网络系统调用,我们能将端到端延迟分解为:Embedding耗时 + Weaviate查询网络耗时 + Weaviate内部处理耗时,从而精确定位性能瓶颈。

局限性与未来迭代

尽管这个方案威力强大,但它并非银弹。

  • 脆弱的 Uprobes: 用户空间探针高度依赖于被探测程序的二进制文件。任何库的更新、Python 版本的变更,甚至不同的编译优化选项,都可能导致函数符号改变或内联,从而使探针失效。这要求我们建立一套自动化的机制来验证探针的有效性。
  • 上下文关联的挑战: 我们使用 pid_tgid 来关联同一线程内的函数入口和出口事件。但在复杂的异步 Python 应用(如使用 asyncio)中,一个请求可能由多个线程或事件循环任务处理,此时仅靠线程 ID 无法有效串联起完整的调用链。实现真正的分布式追踪上下文传播,需要更复杂的 eBPF 逻辑,例如在内核层面解析 HTTP/gRPC 协议头来提取 trace ID。
  • 内核版本依赖: eBPF 的功能和辅助函数与内核版本密切相关。在跨越不同内核版本的节点上部署时,需要确保 BPF 程序的兼容性,这通常通过 CO-RE (Compile Once – Run Everywhere) 技术来解决。

未来的优化路径是明确的:将这套定制化的探针与 OpenTelemetry 标准集成。我们的 eBPF 采集器可以将捕获的数据转化为 OTel 的 Span 和 Metric 格式,然后发送给 OTel Collector。这样一来,我们既能享受到 eBPF 零侵入的深度洞察力,又能融入云原生可观测性的主流生态,与全链路追踪、日志等数据进行无缝关联。


  目录