一个 RAG 查询的响应耗时 5 秒。瓶颈在哪?是向量检索太慢,上下文构建耗时过长,还是 LLM 生成首个 token 的延迟太高?如果 RAG 管道对我们来说是个黑盒,那么优化就无从谈起。在真实项目中,任何无法被度量的东西都无法被有效管理。我的目标很明确:构建一个贯穿前端交互、后端处理,并能深入到 LlamaIndex 执行细节的全链路可观测性系统。
技术选型基于实用主义和最小化复杂度的原则:
- 后端 API: Koa。足够轻量,其基于中间件的架构非常适合注入追踪、日志等横切关注点。
- RAG 引擎: LlamaIndex (TypeScript)。它提供了强大的 RAG 功能,更重要的是,它暴露了
CallbackManager
机制,这是我们实现深度检测的关键。 - 前端状态管理: Recoil。它的原子化状态管理模型能够干净地处理异步数据流,特别是 LLM 的流式响应,同时还能对用户交互和数据获取的生命周期进行精细控制。
- 可观测性标准: OpenTelemetry。这是行业标准,避免了厂商锁定,生态系统成熟。
我们将从零开始,构建一个完整的、可观测的 RAG 应用,重点不在于 RAG 应用本身的功能有多炫酷,而在于如何让它的每一个环节都变得透明。
第一步:奠定后端基础 - Koa 与 LlamaIndex 的集成
首先,我们需要一个能运行的后端服务。这个服务接收用户请求,调用 LlamaIndex 执行 RAG 查询,并将结果以流的形式返回给前端。
项目结构如下:
/rag-observable-app
|-- /backend
| |-- /data # 存放原始文档
| |-- /storage # LlamaIndex 生成的索引文件
| |-- instrumentation.ts # OpenTelemetry 初始化
| |-- server.ts # Koa 服务器主文件
| |-- llama.ts # LlamaIndex 初始化与封装
| |-- package.json
|-- /frontend
| |-- ... (React App)
backend/llama.ts
: 封装 LlamaIndex 核心逻辑
这里的关键是封装一个可复用的 getLlamaQueryEngine
函数。注意,我们暂时将 callbackManager
留空,后续会在这里注入我们的追踪逻辑。在生产环境中,模型配置、嵌入维度等都应该来自环境变量或配置文件。
// backend/llama.ts
import {
Ollama,
VectorStoreIndex,
SimpleDirectoryReader,
serviceContextFromDefaults,
CallbackManager, // 我们将要使用的关键组件
} from "llamaindex";
import path from "path";
import { fileURLToPath } from 'url';
// 在 ES Module 中获取 __dirname 的等价物
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const OLLAMA_BASE_URL = process.env.OLLAMA_BASE_URL || "http://localhost:11434";
const STORAGE_DIR = path.join(__dirname, "storage");
const DATA_DIR = path.join(__dirname, "data");
// 单例模式确保索引只加载一次
let queryEngineInstance: any = null;
async function initializeIndex(callbackManager?: CallbackManager) {
console.log("Initializing LlamaIndex...");
const llm = new Ollama({ model: "llama3", baseURL: OLLAMA_BASE_URL });
// 创建 ServiceContext,这里是注入自定义行为的核心
const serviceContext = serviceContextFromDefaults({
llm,
embedModel: llm, // 使用同一个 Ollama 实例作为嵌入模型
callbackManager, // 注入 CallbackManager
});
try {
const index = await VectorStoreIndex.init({
nodes: await new SimpleDirectoryReader().loadData(DATA_DIR),
serviceContext,
});
// 在真实项目中,索引的构建和加载应该分开
// 这里为了演示方便,每次启动都重新构建
// 生产环境应考虑持久化索引并从磁盘加载
console.log("Index initialized successfully.");
return index.asQueryEngine({ streaming: true });
} catch (error) {
console.error("Failed to initialize LlamaIndex:", error);
// 错误处理:如果索引初始化失败,服务应该无法启动或进入降级模式
throw new Error("LlamaIndex initialization failed.");
}
}
export async function getLlamaQueryEngine(callbackManager?: CallbackManager) {
if (!queryEngineInstance) {
queryEngineInstance = await initializeIndex(callbackManager);
}
// 注意:如果每次请求都需要不同的 callbackManager,这里的单例模式需要调整
// 但对于追踪来说,callbackManager 可以在服务启动时设置好
return queryEngineInstance;
}
backend/server.ts
: Koa 服务器与流式 API
我们使用 Koa 来创建一个 /api/chat/stream
路由。这个路由使用 async function*
(异步生成器) 来处理 LlamaIndex 的流式输出,这是一种非常优雅的处理方式,能有效地将后端的流对接给前端。
// backend/server.ts
import Koa from 'koa';
import Router from '@koa/router';
import { koaBody } from 'koa-body';
import cors from '@koa/cors';
import { PassThrough } from 'stream';
import { getLlamaQueryEngine } from './llama.js';
const app = new Koa();
const router = new Router();
app.use(cors()); // 允许跨域
app.use(koaBody());
router.post('/api/chat/stream', async (ctx) => {
const { query } = ctx.request.body as { query: string };
if (!query) {
ctx.status = 400;
ctx.body = { error: 'Query parameter is required' };
return;
}
try {
const queryEngine = await getLlamaQueryEngine(); // 此处尚未注入追踪
const stream = await queryEngine.query(query);
// 设置流式响应头
ctx.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
// 使用 PassThrough 流将 LlamaIndex 的异步迭代器转换为 Node.js 可读流
const passThrough = new PassThrough();
ctx.status = 200;
ctx.body = passThrough;
// 异步地将数据块写入流
(async () => {
for await (const chunk of stream) {
passThrough.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
passThrough.end();
})();
} catch (error) {
console.error('Error during RAG query:', error);
// 生产级的错误处理需要更细致,例如区分是用户输入问题还是下游服务问题
ctx.status = 500;
// 不直接暴露内部错误给客户端
ctx.body = { error: 'An internal error occurred while processing your request.' };
}
});
app.use(router.routes()).use(router.allowedMethods());
const PORT = process.env.PORT || 3001;
app.listen(PORT, () => {
console.log(`Server listening on port ${PORT}`);
});
至此,我们有了一个功能完备但完全不透明的 RAG 服务。
第二步:注入灵魂 - 使用 OpenTelemetry 实现后端链路追踪
现在,我们开始植入可观测性。首先是在 Koa 层面。我们需要捕获所有进入的 HTTP 请求,并为每个请求创建一个父 Span。
backend/instrumentation.ts
: OpenTelemetry SDK 初始化
这是整个追踪系统的起点。这段代码必须在你的应用代码加载之前执行。通常通过 node --import ./instrumentation.ts server.ts
来启动。
// backend/instrumentation.ts
import { NodeSDK } from '@opentelemetry/sdk-node';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { ConsoleSpanExporter, SimpleSpanProcessor } from '@opentelemetry/sdk-trace-node';
// 配置服务资源信息,这些信息会附加到所有遥测数据上
const resource = new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: 'rag-backend-service',
[SemanticResourceAttributes.SERVICE_VERSION]: '1.0.0',
});
// 配置 Exporter,决定追踪数据发送到哪里。
// 在开发中,ConsoleSpanExporter 非常好用,可以直接在控制台看到输出。
// 生产环境中,应替换为 OTLPTraceExporter,指向 Jaeger, Zipkin, 或其他兼容的收集器。
const traceExporter = new ConsoleSpanExporter();
// const traceExporter = new OTLPTraceExporter({
// url: 'http://localhost:4318/v1/traces', // OTel Collector 的地址
// });
const spanProcessor = new SimpleSpanProcessor(traceExporter);
const sdk = new NodeSDK({
resource,
spanProcessor,
// 自动仪表化 Node.js 核心模块和流行库
// @opentelemetry/instrumentation-koa 会自动为每个请求创建 Span
instrumentations: [getNodeAutoInstrumentations()],
});
// 优雅地关闭 SDK
process.on('SIGTERM', () => {
sdk.shutdown()
.then(() => console.log('Tracing terminated'))
.catch((error) => console.log('Error terminating tracing', error))
.finally(() => process.exit(0));
});
// 启动 SDK
sdk.start();
console.log('OpenTelemetry SDK started.');
启动命令需要修改为:
node --import ./instrumentation.ts server.ts
现在,每次调用 /api/chat/stream
,Koa 的自动仪表化库就会创建一个名为 POST /api/chat/stream
的 Span。但我们更关心的是这个 Span 内部发生了什么。
第三步:深入黑盒 - 自定义 LlamaIndex Callback 来创建子 Span
这是本文的核心。我们需要创建一个自定义的 CallbackHandler
,它遵循 LlamaIndex 的接口,并在各个关键事件(如检索开始/结束,LLM 调用开始/结束)上创建和关闭 OpenTelemetry Span。
// backend/llama.ts (修改和增加的部分)
import {
// ... 其他 imports
CallbackManager,
Event,
CBEventType,
} from "llamaindex";
import opentelemetry, { SpanStatusCode, Span } from '@opentelemetry/api';
// 获取一个 Tracer 实例
const tracer = opentelemetry.trace.getTracer('llamaindex-instrumentation');
class OpenTelemetryCallbackHandler {
// 使用 Map 来存储正在进行的 Span,key 是事件 ID
private spanMap: Map<string, Span> = new Map();
// 定义事件类型到 Span 名称的映射
private eventTypeToSpanName: Record<CBEventType, string> = {
[CBEventType.CHUNKING]: "llamaindex-chunking",
[CBEventType.NODE_PARSING]: "llamaindex-node-parsing",
[CBEventType.EMBEDDING]: "llamaindex-embedding",
[CBEventType.LLM]: "llamaindex-llm-call",
[CBEventType.QUERY]: "llamaindex-query",
[CBEventType.RETRIEVE]: "llamaindex-retrieve",
[CBEventType.SYNTHESIZE]: "llamaindex-synthesize",
[CBEventType.TREE]: "llamaindex-tree-generation",
[CBEventType.SUB_QUESTION]: "llamaindex-sub-question",
[CBEventType.TEMPLATING]: "llamaindex-templating",
[CBEventType.FUNCTION_CALL]: "llamaindex-function-call",
[CBEventType.REGISTER_EVENT]: "llamaindex-register-event",
[CBEventType.AGENT_STEP]: "llamaindex-agent-step",
[CBEventType.AGENT_SEARCH]: "llamaindex-agent-search",
}
startTrace(event: Event) {
if (!event.id) return;
// 从当前上下文中获取父 Span (即 Koa 中间件创建的那个 Span)
const parentSpan = opentelemetry.trace.getActiveSpan();
const ctx = parentSpan ? opentelemetry.trace.setSpan(opentelemetry.context.active(), parentSpan) : undefined;
const spanName = this.eventTypeToSpanName[event.type] || `llamaindex-unknown-${event.type}`;
const span = tracer.startSpan(spanName, undefined, ctx);
// 为 Span 添加丰富的属性(attributes),这对于后续分析至关重要
span.setAttribute('llamaindex.event.type', event.type);
span.setAttribute('llamaindex.event.id', event.id);
// 如果是 LLM 调用,记录模型信息
if (event.type === CBEventType.LLM && event.detail?.messages) {
const model = event.detail.messages[0]?.additionalKwargs?.model;
if(model) {
span.setAttribute('llm.model', model);
}
}
this.spanMap.set(event.id, span);
}
endTrace(event: Event) {
if (!event.id) return;
const span = this.spanMap.get(event.id);
if (span) {
if (event.detail?.error) {
span.recordException(event.detail.error);
span.setStatus({ code: SpanStatusCode.ERROR, message: event.detail.error.message });
} else {
span.setStatus({ code: SpanStatusCode.OK });
}
span.end();
this.spanMap.delete(event.id);
}
}
}
// 修改 getLlamaQueryEngine 以使用我们的 CallbackHandler
export async function getLlamaQueryEngine() {
if (!queryEngineInstance) {
const callbackManager = new CallbackManager();
const otelCallbackHandler = new OpenTelemetryCallbackHandler();
// 注册回调
callbackManager.on(CBEventType.QUERY, (event) => otelCallbackHandler.startTrace(event));
callbackManager.on(CBEventType.QUERY, (event) => otelCallbackHandler.endTrace(event), true);
callbackManager.on(CBEventType.RETRIEVE, (event) => otelCallbackHandler.startTrace(event));
callbackManager.on(CBEventType.RETRIEVE, (event) => otelCallbackHandler.endTrace(event), true);
callbackManager.on(CBEventType.SYNTHESIZE, (event) => otelCallbackHandler.startTrace(event));
callbackManager.on(CBEventType.SYNTHESIZE, (event) => otelCallbackHandler.endTrace(event), true);
callbackManager.on(CBEventType.LLM, (event) => otelCallbackHandler.startTrace(event));
callbackManager.on(CBEventType.LLM, (event) => otelCallbackHandler.endTrace(event), true);
queryEngineInstance = await initializeIndex(callbackManager);
}
return queryEngineInstance;
}
// initializeIndex 函数也需要接受 callbackManager
// ... (之前的 initializeIndex 代码) ...
这里的逻辑非常清晰:我们创建了一个 OpenTelemetryCallbackHandler
类,它监听 LlamaIndex 的内部事件。当一个查询开始时 (CBEventType.QUERY
start),我们创建一个名为 llamaindex-query
的 Span。在这个查询内部,当检索步骤开始时 (CBEventType.RETRIEVE
start),我们又创建一个子 Span llamaindex-retrieve
。当这些步骤结束时,我们关闭对应的 Span。通过这种方式,我们将 LlamaIndex 的执行流程完美地映射到了 OpenTelemetry 的分布式追踪模型中。
现在,再次运行应用并发起一个查询,你会在控制台看到这样的输出:
// ConsoleSpanExporter 的简化输出
TraceId: a1b2c3d4...
SpanId: e5f6g7h8...
ParentSpanId: d4e5f6g7...
Name: llamaindex-retrieve
Status: Ok
...
TraceId: a1b2c3d4...
SpanId: i9j0k1l2...
ParentSpanId: d4e5f6g7...
Name: llamaindex-synthesize
Status: Ok
...
TraceId: a1b2c3d4...
SpanId: d4e5f6g7...
ParentSpanId: c3d4e5f6...
Name: llamaindex-query
Status: Ok
...
TraceId: a1b2c3d4...
SpanId: c3d4e5f6...
ParentSpanId: (undefined)
Name: POST /api/chat/stream
Status: Ok
...
使用 Jaeger UI 查看,会得到一个清晰的火焰图,显示出 POST /api/chat/stream
调用中,llamaindex-query
占用了多少时间,而 llamaindex-query
内部,又是 retrieve
和 synthesize
各自的耗时。我们的黑盒被打开了。
sequenceDiagram participant C as Client participant K as Koa Server participant L as LlamaIndex participant O as OpenTelemetry C->>+K: POST /api/chat/stream (query) O-->>K: instrumentation.koa starts root span `POST /...` K->>+L: queryEngine.query(query) O-->>L: CallbackManager starts child span `llamaindex-query` L->>+L: Retrieval Step O-->>L: CallbackManager starts sub-child span `llamaindex-retrieve` L-->>-L: Retrieval Done O-->>L: CallbackManager ends `llamaindex-retrieve` L->>+L: Synthesis Step (LLM call) O-->>L: CallbackManager starts sub-child span `llamaindex-synthesize` L-->>-L: Synthesis Done O-->>L: CallbackManager ends `llamaindex-synthesize` L-->>-K: Stream Chunks... O-->>L: CallbackManager ends `llamaindex-query` K-->>-C: Stream Response O-->>K: instrumentation.koa ends root span `POST /...`
第四步:构建前端 - 使用 Recoil 优雅地处理流式状态
前端的挑战在于如何处理流式响应并将其平滑地展示给用户,同时保持应用状态的一致性。Recoil 在这方面表现出色。
定义 Recoil 状态
// frontend/src/state/atoms.ts
import { atom } from 'recoil';
export interface ChatMessage {
id: string;
role: 'user' | 'assistant';
content: string;
}
// 存储整个对话历史
export const chatHistoryState = atom<ChatMessage[]>({
key: 'chatHistoryState',
default: [],
});
// 存储当前正在输入的查询
export const queryInputState = atom<string>({
key: 'queryInputState',
default: '',
});
// 跟踪查询状态:idle, loading, streaming, error
export const queryStatusState = atom<'idle' | 'loading' | 'streaming' | 'error'>({
key: 'queryStatusState',
default: 'idle',
});
实现核心组件和交互逻辑
// frontend/src/components/Chat.tsx
import React from 'react';
import { useRecoilState, useSetRecoilState } from 'recoil';
import { chatHistoryState, queryInputState, queryStatusState } from '../state/atoms';
import { v4 as uuidv4 } from 'uuid';
export const Chat = () => {
const [query, setQuery] = useRecoilState(queryInputTate);
const [history, setHistory] = useRecoilState(chatHistoryState);
const setStatus = useSetRecoilState(queryStatusState);
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
if (!query.trim()) return;
// 1. 更新对话历史和状态
const userMessage = { id: uuidv4(), role: 'user', content: query };
const assistantMessageId = uuidv4();
setHistory(prev => [...prev, userMessage, { id: assistantMessageId, role: 'assistant', content: '' }]);
setStatus('loading');
setQuery('');
try {
const response = await fetch('http://localhost:3001/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query }),
});
if (!response.ok || !response.body) {
throw new Error(`Request failed with status ${response.status}`);
}
setStatus('streaming');
const reader = response.body.getReader();
const decoder = new TextDecoder();
// 2. 处理流式响应
while (true) {
const { done, value } = await reader.read();
if (done) break;
const rawChunk = decoder.decode(value);
// SSE (Server-Sent Events) 格式解析
const lines = rawChunk.split('\n').filter(line => line.startsWith('data: '));
for (const line of lines) {
const jsonString = line.substring(6);
const parsedChunk = JSON.parse(jsonString);
// 3. 实时更新 Recoil state
setHistory(prevHistory =>
prevHistory.map(msg =>
msg.id === assistantMessageId
? { ...msg, content: msg.content + parsedChunk.response }
: msg
)
);
}
}
} catch (error) {
console.error("Streaming failed:", error);
setStatus('error');
setHistory(prev => prev.map(msg =>
msg.id === assistantMessageId
? { ...msg, content: 'Sorry, an error occurred.' }
: msg
));
} finally {
setStatus('idle');
}
};
// ... JSX for rendering chat UI
};
这段代码展示了 Recoil 的威力:
- 提交查询时,我们立即更新
chatHistoryState
和queryStatusState
,UI 几乎是瞬时响应。 - 在
fetch
请求的while
循环中,每当接收到一个新的数据块,我们都通过setHistory
更新助理的回复内容。Recoil 会高效地只重新渲染需要更新的部分。 - 整个异步流程的状态(loading, streaming, error, idle)都由
queryStatusState
这个原子精确管理,UI 的不同状态(如禁用输入框)可以轻易地与之绑定。
虽然我们没有在前端实现完整的 OpenTelemetry 追踪,但这是逻辑上的下一步。通过 @opentelemetry/instrumentation-fetch
,fetch
请求会自动携带 traceparent
头,将前端用户操作和后端处理链路无缝地连接起来,形成一个完整的、端到端的 trace。
方案局限性与未来展望
这个实现虽然打通了全链路追踪,但在生产环境中还有几个需要注意的地方:
- Exporter 配置:
ConsoleSpanExporter
仅适用于开发。生产系统必须使用OTLPTraceExporter
将数据发送到 OpenTelemetry Collector,再由 Collector 进行处理、采样并转发到后端存储如 Jaeger 或 Prometheus。 - 采样策略: 在高流量系统中,追踪所有请求的成本是巨大的。必须配置采样策略,例如基于头部的采样或尾部采样,只记录一部分有代表性的请求或所有错误的请求。
- 上下文传播: 当前方案依赖于 LlamaIndex 的
CallbackManager
。如果 RAG 管道中包含其他异步操作(如调用外部 API),需要确保 OpenTelemetry 的Context
在这些异步边界之间正确传播,否则追踪链会断裂。 - 指标与日志的关联: 真正的可观测性是 Traces, Metrics, Logs 的结合。下一步应该在关键代码路径中记录带有
trace_id
和span_id
的结构化日志,并上报关键业务指标(如 token 生成速度、检索命中率),从而实现三者的关联分析。
通过这套机制,我们成功地将一个原本模糊的 RAG 执行过程,变成了一个可以被精确度量和分析的系统。当下一个性能问题出现时,我们不再需要猜测,而是可以直接通过追踪数据定位到具体的瓶颈,无论是代码逻辑、模型性能还是基础设施问题。