为 LlamaIndex RAG 应用构建从 Recoil 前端到 Koa 后端的全链路追踪


一个 RAG 查询的响应耗时 5 秒。瓶颈在哪?是向量检索太慢,上下文构建耗时过长,还是 LLM 生成首个 token 的延迟太高?如果 RAG 管道对我们来说是个黑盒,那么优化就无从谈起。在真实项目中,任何无法被度量的东西都无法被有效管理。我的目标很明确:构建一个贯穿前端交互、后端处理,并能深入到 LlamaIndex 执行细节的全链路可观测性系统。

技术选型基于实用主义和最小化复杂度的原则:

  • 后端 API: Koa。足够轻量,其基于中间件的架构非常适合注入追踪、日志等横切关注点。
  • RAG 引擎: LlamaIndex (TypeScript)。它提供了强大的 RAG 功能,更重要的是,它暴露了 CallbackManager 机制,这是我们实现深度检测的关键。
  • 前端状态管理: Recoil。它的原子化状态管理模型能够干净地处理异步数据流,特别是 LLM 的流式响应,同时还能对用户交互和数据获取的生命周期进行精细控制。
  • 可观测性标准: OpenTelemetry。这是行业标准,避免了厂商锁定,生态系统成熟。

我们将从零开始,构建一个完整的、可观测的 RAG 应用,重点不在于 RAG 应用本身的功能有多炫酷,而在于如何让它的每一个环节都变得透明。

第一步:奠定后端基础 - Koa 与 LlamaIndex 的集成

首先,我们需要一个能运行的后端服务。这个服务接收用户请求,调用 LlamaIndex 执行 RAG 查询,并将结果以流的形式返回给前端。

项目结构如下:

/rag-observable-app
|-- /backend
|   |-- /data                   # 存放原始文档
|   |-- /storage                # LlamaIndex 生成的索引文件
|   |-- instrumentation.ts      # OpenTelemetry 初始化
|   |-- server.ts               # Koa 服务器主文件
|   |-- llama.ts                # LlamaIndex 初始化与封装
|   |-- package.json
|-- /frontend
|   |-- ... (React App)

backend/llama.ts: 封装 LlamaIndex 核心逻辑

这里的关键是封装一个可复用的 getLlamaQueryEngine 函数。注意,我们暂时将 callbackManager 留空,后续会在这里注入我们的追踪逻辑。在生产环境中,模型配置、嵌入维度等都应该来自环境变量或配置文件。

// backend/llama.ts
import {
  Ollama,
  VectorStoreIndex,
  SimpleDirectoryReader,
  serviceContextFromDefaults,
  CallbackManager, // 我们将要使用的关键组件
} from "llamaindex";
import path from "path";
import { fileURLToPath } from 'url';

// 在 ES Module 中获取 __dirname 的等价物
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);

const OLLAMA_BASE_URL = process.env.OLLAMA_BASE_URL || "http://localhost:11434";
const STORAGE_DIR = path.join(__dirname, "storage");
const DATA_DIR = path.join(__dirname, "data");

// 单例模式确保索引只加载一次
let queryEngineInstance: any = null;

async function initializeIndex(callbackManager?: CallbackManager) {
  console.log("Initializing LlamaIndex...");
  
  const llm = new Ollama({ model: "llama3", baseURL: OLLAMA_BASE_URL });
  
  // 创建 ServiceContext,这里是注入自定义行为的核心
  const serviceContext = serviceContextFromDefaults({
    llm,
    embedModel: llm, // 使用同一个 Ollama 实例作为嵌入模型
    callbackManager, // 注入 CallbackManager
  });

  try {
    const index = await VectorStoreIndex.init({
      nodes: await new SimpleDirectoryReader().loadData(DATA_DIR),
      serviceContext,
    });
    
    // 在真实项目中,索引的构建和加载应该分开
    // 这里为了演示方便,每次启动都重新构建
    // 生产环境应考虑持久化索引并从磁盘加载
    console.log("Index initialized successfully.");
    return index.asQueryEngine({ streaming: true });

  } catch (error) {
    console.error("Failed to initialize LlamaIndex:", error);
    // 错误处理:如果索引初始化失败,服务应该无法启动或进入降级模式
    throw new Error("LlamaIndex initialization failed.");
  }
}

export async function getLlamaQueryEngine(callbackManager?: CallbackManager) {
  if (!queryEngineInstance) {
    queryEngineInstance = await initializeIndex(callbackManager);
  }
  // 注意:如果每次请求都需要不同的 callbackManager,这里的单例模式需要调整
  // 但对于追踪来说,callbackManager 可以在服务启动时设置好
  return queryEngineInstance;
}

backend/server.ts: Koa 服务器与流式 API

我们使用 Koa 来创建一个 /api/chat/stream 路由。这个路由使用 async function* (异步生成器) 来处理 LlamaIndex 的流式输出,这是一种非常优雅的处理方式,能有效地将后端的流对接给前端。

// backend/server.ts
import Koa from 'koa';
import Router from '@koa/router';
import { koaBody } from 'koa-body';
import cors from '@koa/cors';
import { PassThrough } from 'stream';
import { getLlamaQueryEngine } from './llama.js';

const app = new Koa();
const router = new Router();

app.use(cors()); // 允许跨域
app.use(koaBody());

router.post('/api/chat/stream', async (ctx) => {
  const { query } = ctx.request.body as { query: string };

  if (!query) {
    ctx.status = 400;
    ctx.body = { error: 'Query parameter is required' };
    return;
  }

  try {
    const queryEngine = await getLlamaQueryEngine(); // 此处尚未注入追踪
    const stream = await queryEngine.query(query);

    // 设置流式响应头
    ctx.set({
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    });

    // 使用 PassThrough 流将 LlamaIndex 的异步迭代器转换为 Node.js 可读流
    const passThrough = new PassThrough();
    ctx.status = 200;
    ctx.body = passThrough;

    // 异步地将数据块写入流
    (async () => {
      for await (const chunk of stream) {
        passThrough.write(`data: ${JSON.stringify(chunk)}\n\n`);
      }
      passThrough.end();
    })();

  } catch (error) {
    console.error('Error during RAG query:', error);
    // 生产级的错误处理需要更细致,例如区分是用户输入问题还是下游服务问题
    ctx.status = 500;
    // 不直接暴露内部错误给客户端
    ctx.body = { error: 'An internal error occurred while processing your request.' };
  }
});

app.use(router.routes()).use(router.allowedMethods());

const PORT = process.env.PORT || 3001;
app.listen(PORT, () => {
  console.log(`Server listening on port ${PORT}`);
});

至此,我们有了一个功能完备但完全不透明的 RAG 服务。

第二步:注入灵魂 - 使用 OpenTelemetry 实现后端链路追踪

现在,我们开始植入可观测性。首先是在 Koa 层面。我们需要捕获所有进入的 HTTP 请求,并为每个请求创建一个父 Span。

backend/instrumentation.ts: OpenTelemetry SDK 初始化

这是整个追踪系统的起点。这段代码必须在你的应用代码加载之前执行。通常通过 node --import ./instrumentation.ts server.ts 来启动。

// backend/instrumentation.ts
import { NodeSDK } from '@opentelemetry/sdk-node';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { ConsoleSpanExporter, SimpleSpanProcessor } from '@opentelemetry/sdk-trace-node';

// 配置服务资源信息,这些信息会附加到所有遥测数据上
const resource = new Resource({
  [SemanticResourceAttributes.SERVICE_NAME]: 'rag-backend-service',
  [SemanticResourceAttributes.SERVICE_VERSION]: '1.0.0',
});

// 配置 Exporter,决定追踪数据发送到哪里。
// 在开发中,ConsoleSpanExporter 非常好用,可以直接在控制台看到输出。
// 生产环境中,应替换为 OTLPTraceExporter,指向 Jaeger, Zipkin, 或其他兼容的收集器。
const traceExporter = new ConsoleSpanExporter();
// const traceExporter = new OTLPTraceExporter({
//   url: 'http://localhost:4318/v1/traces', // OTel Collector 的地址
// });

const spanProcessor = new SimpleSpanProcessor(traceExporter);

const sdk = new NodeSDK({
  resource,
  spanProcessor,
  // 自动仪表化 Node.js 核心模块和流行库
  // @opentelemetry/instrumentation-koa 会自动为每个请求创建 Span
  instrumentations: [getNodeAutoInstrumentations()],
});

// 优雅地关闭 SDK
process.on('SIGTERM', () => {
  sdk.shutdown()
    .then(() => console.log('Tracing terminated'))
    .catch((error) => console.log('Error terminating tracing', error))
    .finally(() => process.exit(0));
});

// 启动 SDK
sdk.start();
console.log('OpenTelemetry SDK started.');

启动命令需要修改为:

node --import ./instrumentation.ts server.ts

现在,每次调用 /api/chat/stream,Koa 的自动仪表化库就会创建一个名为 POST /api/chat/stream 的 Span。但我们更关心的是这个 Span 内部发生了什么。

第三步:深入黑盒 - 自定义 LlamaIndex Callback 来创建子 Span

这是本文的核心。我们需要创建一个自定义的 CallbackHandler,它遵循 LlamaIndex 的接口,并在各个关键事件(如检索开始/结束,LLM 调用开始/结束)上创建和关闭 OpenTelemetry Span。

// backend/llama.ts (修改和增加的部分)
import {
  // ... 其他 imports
  CallbackManager,
  Event,
  CBEventType,
} from "llamaindex";
import opentelemetry, { SpanStatusCode, Span } from '@opentelemetry/api';

// 获取一个 Tracer 实例
const tracer = opentelemetry.trace.getTracer('llamaindex-instrumentation');

class OpenTelemetryCallbackHandler {
  // 使用 Map 来存储正在进行的 Span,key 是事件 ID
  private spanMap: Map<string, Span> = new Map();

  // 定义事件类型到 Span 名称的映射
  private eventTypeToSpanName: Record<CBEventType, string> = {
    [CBEventType.CHUNKING]: "llamaindex-chunking",
    [CBEventType.NODE_PARSING]: "llamaindex-node-parsing",
    [CBEventType.EMBEDDING]: "llamaindex-embedding",
    [CBEventType.LLM]: "llamaindex-llm-call",
    [CBEventType.QUERY]: "llamaindex-query",
    [CBEventType.RETRIEVE]: "llamaindex-retrieve",
    [CBEventType.SYNTHESIZE]: "llamaindex-synthesize",
    [CBEventType.TREE]: "llamaindex-tree-generation",
    [CBEventType.SUB_QUESTION]: "llamaindex-sub-question",
    [CBEventType.TEMPLATING]: "llamaindex-templating",
    [CBEventType.FUNCTION_CALL]: "llamaindex-function-call",
    [CBEventType.REGISTER_EVENT]: "llamaindex-register-event",
    [CBEventType.AGENT_STEP]: "llamaindex-agent-step",
    [CBEventType.AGENT_SEARCH]: "llamaindex-agent-search",
  }

  startTrace(event: Event) {
    if (!event.id) return;
    
    // 从当前上下文中获取父 Span (即 Koa 中间件创建的那个 Span)
    const parentSpan = opentelemetry.trace.getActiveSpan();
    const ctx = parentSpan ? opentelemetry.trace.setSpan(opentelemetry.context.active(), parentSpan) : undefined;

    const spanName = this.eventTypeToSpanName[event.type] || `llamaindex-unknown-${event.type}`;
    
    const span = tracer.startSpan(spanName, undefined, ctx);
    
    // 为 Span 添加丰富的属性(attributes),这对于后续分析至关重要
    span.setAttribute('llamaindex.event.type', event.type);
    span.setAttribute('llamaindex.event.id', event.id);

    // 如果是 LLM 调用,记录模型信息
    if (event.type === CBEventType.LLM && event.detail?.messages) {
      const model = event.detail.messages[0]?.additionalKwargs?.model;
      if(model) {
        span.setAttribute('llm.model', model);
      }
    }

    this.spanMap.set(event.id, span);
  }

  endTrace(event: Event) {
    if (!event.id) return;
    const span = this.spanMap.get(event.id);
    if (span) {
      if (event.detail?.error) {
        span.recordException(event.detail.error);
        span.setStatus({ code: SpanStatusCode.ERROR, message: event.detail.error.message });
      } else {
        span.setStatus({ code: SpanStatusCode.OK });
      }
      span.end();
      this.spanMap.delete(event.id);
    }
  }
}

// 修改 getLlamaQueryEngine 以使用我们的 CallbackHandler
export async function getLlamaQueryEngine() {
  if (!queryEngineInstance) {
    const callbackManager = new CallbackManager();
    const otelCallbackHandler = new OpenTelemetryCallbackHandler();

    // 注册回调
    callbackManager.on(CBEventType.QUERY, (event) => otelCallbackHandler.startTrace(event));
    callbackManager.on(CBEventType.QUERY, (event) => otelCallbackHandler.endTrace(event), true);
    
    callbackManager.on(CBEventType.RETRIEVE, (event) => otelCallbackHandler.startTrace(event));
    callbackManager.on(CBEventType.RETRIEVE, (event) => otelCallbackHandler.endTrace(event), true);

    callbackManager.on(CBEventType.SYNTHESIZE, (event) => otelCallbackHandler.startTrace(event));
    callbackManager.on(CBEventType.SYNTHESIZE, (event) => otelCallbackHandler.endTrace(event), true);
    
    callbackManager.on(CBEventType.LLM, (event) => otelCallbackHandler.startTrace(event));
    callbackManager.on(CBEventType.LLM, (event) => otelCallbackHandler.endTrace(event), true);

    queryEngineInstance = await initializeIndex(callbackManager);
  }
  return queryEngineInstance;
}

// initializeIndex 函数也需要接受 callbackManager
// ... (之前的 initializeIndex 代码) ...

这里的逻辑非常清晰:我们创建了一个 OpenTelemetryCallbackHandler 类,它监听 LlamaIndex 的内部事件。当一个查询开始时 (CBEventType.QUERY start),我们创建一个名为 llamaindex-query 的 Span。在这个查询内部,当检索步骤开始时 (CBEventType.RETRIEVE start),我们又创建一个子 Span llamaindex-retrieve。当这些步骤结束时,我们关闭对应的 Span。通过这种方式,我们将 LlamaIndex 的执行流程完美地映射到了 OpenTelemetry 的分布式追踪模型中。

现在,再次运行应用并发起一个查询,你会在控制台看到这样的输出:

// ConsoleSpanExporter 的简化输出
TraceId:      a1b2c3d4...
SpanId:       e5f6g7h8...
ParentSpanId: d4e5f6g7...
Name:         llamaindex-retrieve
Status:       Ok
...
TraceId:      a1b2c3d4...
SpanId:       i9j0k1l2...
ParentSpanId: d4e5f6g7...
Name:         llamaindex-synthesize
Status:       Ok
...
TraceId:      a1b2c3d4...
SpanId:       d4e5f6g7...
ParentSpanId: c3d4e5f6...
Name:         llamaindex-query
Status:       Ok
...
TraceId:      a1b2c3d4...
SpanId:       c3d4e5f6...
ParentSpanId: (undefined)
Name:         POST /api/chat/stream
Status:       Ok
...

使用 Jaeger UI 查看,会得到一个清晰的火焰图,显示出 POST /api/chat/stream 调用中,llamaindex-query 占用了多少时间,而 llamaindex-query 内部,又是 retrievesynthesize 各自的耗时。我们的黑盒被打开了。

sequenceDiagram
    participant C as Client
    participant K as Koa Server
    participant L as LlamaIndex
    participant O as OpenTelemetry

    C->>+K: POST /api/chat/stream (query)
    O-->>K: instrumentation.koa starts root span `POST /...`
    K->>+L: queryEngine.query(query)
    O-->>L: CallbackManager starts child span `llamaindex-query`
    L->>+L: Retrieval Step
    O-->>L: CallbackManager starts sub-child span `llamaindex-retrieve`
    L-->>-L: Retrieval Done
    O-->>L: CallbackManager ends `llamaindex-retrieve`
    L->>+L: Synthesis Step (LLM call)
    O-->>L: CallbackManager starts sub-child span `llamaindex-synthesize`
    L-->>-L: Synthesis Done
    O-->>L: CallbackManager ends `llamaindex-synthesize`
    L-->>-K: Stream Chunks...
    O-->>L: CallbackManager ends `llamaindex-query`
    K-->>-C: Stream Response
    O-->>K: instrumentation.koa ends root span `POST /...`

第四步:构建前端 - 使用 Recoil 优雅地处理流式状态

前端的挑战在于如何处理流式响应并将其平滑地展示给用户,同时保持应用状态的一致性。Recoil 在这方面表现出色。

定义 Recoil 状态

// frontend/src/state/atoms.ts
import { atom } from 'recoil';

export interface ChatMessage {
  id: string;
  role: 'user' | 'assistant';
  content: string;
}

// 存储整个对话历史
export const chatHistoryState = atom<ChatMessage[]>({
  key: 'chatHistoryState',
  default: [],
});

// 存储当前正在输入的查询
export const queryInputState = atom<string>({
  key: 'queryInputState',
  default: '',
});

// 跟踪查询状态:idle, loading, streaming, error
export const queryStatusState = atom<'idle' | 'loading' | 'streaming' | 'error'>({
  key: 'queryStatusState',
  default: 'idle',
});

实现核心组件和交互逻辑

// frontend/src/components/Chat.tsx
import React from 'react';
import { useRecoilState, useSetRecoilState } from 'recoil';
import { chatHistoryState, queryInputState, queryStatusState } from '../state/atoms';
import { v4 as uuidv4 } from 'uuid';

export const Chat = () => {
  const [query, setQuery] = useRecoilState(queryInputTate);
  const [history, setHistory] = useRecoilState(chatHistoryState);
  const setStatus = useSetRecoilState(queryStatusState);

  const handleSubmit = async (e: React.FormEvent) => {
    e.preventDefault();
    if (!query.trim()) return;

    // 1. 更新对话历史和状态
    const userMessage = { id: uuidv4(), role: 'user', content: query };
    const assistantMessageId = uuidv4();
    setHistory(prev => [...prev, userMessage, { id: assistantMessageId, role: 'assistant', content: '' }]);
    setStatus('loading');
    setQuery('');

    try {
      const response = await fetch('http://localhost:3001/api/chat/stream', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ query }),
      });

      if (!response.ok || !response.body) {
        throw new Error(`Request failed with status ${response.status}`);
      }
      
      setStatus('streaming');
      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      
      // 2. 处理流式响应
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const rawChunk = decoder.decode(value);
        // SSE (Server-Sent Events) 格式解析
        const lines = rawChunk.split('\n').filter(line => line.startsWith('data: '));
        for (const line of lines) {
            const jsonString = line.substring(6);
            const parsedChunk = JSON.parse(jsonString);

            // 3. 实时更新 Recoil state
            setHistory(prevHistory => 
                prevHistory.map(msg => 
                    msg.id === assistantMessageId 
                        ? { ...msg, content: msg.content + parsedChunk.response }
                        : msg
                )
            );
        }
      }

    } catch (error) {
      console.error("Streaming failed:", error);
      setStatus('error');
      setHistory(prev => prev.map(msg => 
          msg.id === assistantMessageId 
              ? { ...msg, content: 'Sorry, an error occurred.' }
              : msg
      ));
    } finally {
      setStatus('idle');
    }
  };

  // ... JSX for rendering chat UI
};

这段代码展示了 Recoil 的威力:

  1. 提交查询时,我们立即更新 chatHistoryStatequeryStatusState,UI 几乎是瞬时响应。
  2. fetch 请求的 while 循环中,每当接收到一个新的数据块,我们都通过 setHistory 更新助理的回复内容。Recoil 会高效地只重新渲染需要更新的部分。
  3. 整个异步流程的状态(loading, streaming, error, idle)都由 queryStatusState 这个原子精确管理,UI 的不同状态(如禁用输入框)可以轻易地与之绑定。

虽然我们没有在前端实现完整的 OpenTelemetry 追踪,但这是逻辑上的下一步。通过 @opentelemetry/instrumentation-fetchfetch 请求会自动携带 traceparent 头,将前端用户操作和后端处理链路无缝地连接起来,形成一个完整的、端到端的 trace。

方案局限性与未来展望

这个实现虽然打通了全链路追踪,但在生产环境中还有几个需要注意的地方:

  1. Exporter 配置: ConsoleSpanExporter 仅适用于开发。生产系统必须使用 OTLPTraceExporter 将数据发送到 OpenTelemetry Collector,再由 Collector 进行处理、采样并转发到后端存储如 Jaeger 或 Prometheus。
  2. 采样策略: 在高流量系统中,追踪所有请求的成本是巨大的。必须配置采样策略,例如基于头部的采样或尾部采样,只记录一部分有代表性的请求或所有错误的请求。
  3. 上下文传播: 当前方案依赖于 LlamaIndex 的 CallbackManager。如果 RAG 管道中包含其他异步操作(如调用外部 API),需要确保 OpenTelemetry 的 Context 在这些异步边界之间正确传播,否则追踪链会断裂。
  4. 指标与日志的关联: 真正的可观测性是 Traces, Metrics, Logs 的结合。下一步应该在关键代码路径中记录带有 trace_idspan_id 的结构化日志,并上报关键业务指标(如 token 生成速度、检索命中率),从而实现三者的关联分析。

通过这套机制,我们成功地将一个原本模糊的 RAG 执行过程,变成了一个可以被精确度量和分析的系统。当下一个性能问题出现时,我们不再需要猜测,而是可以直接通过追踪数据定位到具体的瓶颈,无论是代码逻辑、模型性能还是基础设施问题。


  目录