我们团队最近上线了一个基于 LangChain 的内部知识库问答机器人,底层使用 FastAPI 驱动。初版反响不错,但一个致命问题很快暴露出来:它是一个完全匿名的服务。所有请求看起来都一样,这在企业环境中引发了两个核心痛点:
- 访问控制缺失: 任何能访问内网的人都可以调用它的 API,我们无法基于用户的部门或角色进行权限控制。
- 问题排查困难: 当 LangChain 的执行链中某个环节(如调用 LLM、访问向量数据库)出错时,Sentry 捕获到的异常是匿名的。我们不知道是哪个用户的哪次会话导致了问题,也无法联系用户获取更多上下文,排查效率极低。
技术目标很明确:为这个 LangChain 服务集成公司现有的 SAML 2.0 单点登录(SSO)体系,并将认证后的用户身份信息贯穿整个请求链路,最终在 Sentry 的错误报告中清晰地呈现出来。这不仅仅是加一个登录页面那么简单,关键在于如何将 Web 层的身份上下文(SAML Assertion)安全、可靠地传递到 LangChain 的异步执行环境中,并与可观测性系统深度集成。
初步构想与技术选型
最初的想法是“在 FastAPI 上加个 SAML 中间件”。这个方向没错,但魔鬼在细节里。整个流程应该如下:
sequenceDiagram participant User as 用户 participant Browser as 浏览器 participant FastAPI as LangChain后端 participant IdP as SAML IdP (如Okta, Azure AD) participant Sentry as Sentry平台 User->>Browser: 访问受保护的问答页面 Browser->>FastAPI: 发起请求 /api/ask FastAPI-->>Browser: 302 重定向至 IdP Browser->>IdP: 发起 SAML AuthnRequest IdP-->>User: 要求用户登录 User->>IdP: 输入凭证 IdP-->>Browser: 返回签名的 SAML Response (POST) Browser->>FastAPI: POST SAML Response至 /acs (Assertion Consumer Service) FastAPI->>FastAPI: 校验 SAML Response, 解析用户身份 FastAPI->>FastAPI: 创建会话, 注入用户上下文 FastAPI-->>Browser: 302 重定向回原始问答页面 Browser->>FastAPI: 再次请求 /api/ask (携带会话Cookie) FastAPI->>FastAPI: 中间件校验会话, 设置Sentry Scope, 填充ContextVar Note over FastAPI: LangChain Agent 执行... FastAPI-->>Browser: 返回问答结果 alt 发生异常 FastAPI->>Sentry: 发送异常事件 (包含用户ID, email等) end
这个流程的核心在于,FastAPI 在 /acs
路由上完成认证后,如何将用户身份(比如 user_id
, email
)与后续的 API 请求关联起来。
技术栈选择:
- Web 框架: FastAPI。其原生的
async/await
支持与 LangChain 的异步执行链完美契合,这是关键。 - SAML 库:
python3-saml
。这是一个成熟且功能完备的库,虽然配置稍显复杂,但稳定性有保障。 - 上下文传递:
contextvars
。Python 标准库,专门用于在异步任务中安全地传递上下文信息,避免了在每个函数签名中手动传递user
对象的窘境。 - 错误监控:
sentry-sdk
。Sentry 官方 Python SDK,提供了丰富的上下文管理 API。
步骤化实现:从认证到可观测性闭环
1. SAML 基础配置与 FastAPI 集成
第一步是让 FastAPI 能够处理 SAML 流程。这需要配置 IdP(身份提供商)和 SP(服务提供商,即我们的 FastAPI 应用)的信息。
假设我们的 FastAPI 应用运行在 http://localhost:8000
。我们需要一个地方存放 SAML 配置。
文件结构:
.
├── saml_config/
│ ├── advanced_settings.json
│ ├── settings.json
│ ├── idp_metadata.xml # 从IdP处下载
├── main.py
└── requirements.txt
saml_config/settings.json
(SP 配置):
{
"strict": true,
"debug": true,
"sp": {
"entityId": "my-langchain-app-sp",
"assertionConsumerService": {
"url": "http://localhost:8000/api/saml/acs",
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"
},
"singleLogoutService": {
"url": "http://localhost:8000/api/saml/sls",
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
},
"nameIdFormat": "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress"
},
"idp": {
"entityId": "http://www.okta.com/exkabcdefg1234567890", // 从 IdP metadata 中获取
"singleSignOnService": {
"url": "https://your-idp.okta.com/app/your-app/sso/saml", // 从 IdP metadata 中获取
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
},
"singleLogoutService": {
"url": "https://your-idp.okta.com/app/your-app/slo/saml", // 从 IdP metadata 中获取
"binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
},
"x509cert": "MIIDq...your...cert...END CERTIFICATE-----\n" // 从 IdP metadata 中获取
}
}
saml_config/advanced_settings.json
(安全配置):
{
"security": {
"nameIdEncrypted": false,
"authnRequestsSigned": false,
"logoutRequestSigned": false,
"logoutResponseSigned": false,
"signMetadata": false,
"wantMessagesSigned": false,
"wantAssertionsSigned": true, // 关键:必须要求断言是签名的
"wantAssertionsEncrypted": false,
"wantNameId": true,
"wantNameIdEncrypted": false,
"requestedAuthnContext": true,
"signatureAlgorithm": "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256",
"digestAlgorithm": "http://www.w3.org/2001/04/xmlenc#sha256"
}
}
在真实项目中,authnRequestsSigned
等都应设为 true
,并提供 SP 的私钥和证书。
2. 编写 SAML 工具类和 FastAPI 路由
为了代码整洁,我们把 python3-saml
的初始化逻辑封装起来。
# main.py (部分代码)
import os
import logging
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.responses import RedirectResponse
from starlette.middleware.sessions import SessionMiddleware
from onelogin.saml2.auth import OneLogin_Saml2_Auth
from onelogin.saml2.utils import OneLogin_Saml2_Utils
# ... Sentry 和 LangChain 的 imports ...
# 日志配置
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
# SessionMiddleware 是必需的,用于在 SAML 重定向后保持用户状态
app.add_middleware(SessionMiddleware, secret_key="a_very_secret_key_for_dev")
def prepare_saml_request(request: Request):
"""
根据 FastAPI 的请求对象准备 SAML 请求,主要是构建 URL。
"""
return {
'https': 'on' if request.url.scheme == 'https' else 'off',
'http_host': request.url.netloc,
'script_name': request.url.path,
'get_data': request.query_params,
'post_data': {} # 将在路由中填充
}
def init_saml_auth(saml_req: dict):
"""
初始化 SAML Auth 对象。
这里的坑在于:`python3-saml` 库的配置加载不是线程安全的,
但在 FastAPI 这种 ASGI 应用中,每次请求都初始化是安全的。
"""
old_cwd = os.getcwd()
try:
# 库的路径加载有 bug,需要切换工作目录
config_path = os.path.join(os.path.dirname(__file__), 'saml_config')
os.chdir(config_path)
auth = OneLogin_Saml2_Auth(saml_req, old_settings_path=os.path.join(config_path))
finally:
os.chdir(old_cwd)
return auth
@app.post("/api/saml/acs")
async def process_acs(request: Request):
"""
Assertion Consumer Service: SAML IdP 回调的端点
"""
req = prepare_saml_request(request)
form_data = await request.form()
req['post_data'] = {k: v for k, v in form_data.items()}
auth = init_saml_auth(req)
auth.process_response()
errors = auth.get_errors()
if errors:
logger.error(f"SAML ACS Error: {errors}, Reason: {auth.get_last_error_reason()}")
raise HTTPException(status_code=401, detail=f"SAML Error: {auth.get_last_error_reason()}")
if not auth.is_authenticated():
logger.warning("SAML ACS: Not authenticated.")
raise HTTPException(status_code=401, detail="Not authenticated")
# 认证成功,将用户信息存入 session
request.session['samlUserdata'] = auth.get_attributes()
request.session['samlNameId'] = auth.get_nameid()
request.session['samlSessionIndex'] = auth.get_session_index()
logger.info(f"User {auth.get_nameid()} successfully authenticated.")
# 重定向到用户最初想访问的页面
relay_state = req['post_data'].get('RelayState', '/')
return RedirectResponse(url=relay_state)
3. 保护 API 端点并传递上下文
现在,我们需要一个依赖项或中间件来保护我们的 LangChain API,并在这里把用户身份注入到请求上下文中。这里是 contextvars
发挥作用的地方。
# main.py (续)
import contextvars
from typing import Optional, Dict
# 创建一个 ContextVar,用于在异步任务中安全地存储用户信息
# 这是一个关键步骤,它解决了如何在 LangChain 的深层调用中获取用户信息的问题
user_context = contextvars.ContextVar("user_context", default=None)
# Sentry SDK 初始化
import sentry_sdk
sentry_sdk.init(
dsn="YOUR_SENTRY_DSN",
# 启用性能监控
traces_sample_rate=1.0,
)
async def get_current_user(request: Request) -> Dict:
"""
FastAPI 依赖项,用于保护路由。
它会检查 session,如果未认证,则重定向到 SAML 登录流程。
认证成功后,它会设置 Sentry scope 和 contextvars。
"""
if 'samlUserdata' in request.session:
user_data = {
"id": request.session.get('samlNameId'),
"email": request.session.get('samlNameId'), # 假设 NameId 就是 email
"attributes": request.session.get('samlUserdata')
}
# 1. 设置 Sentry User Scope
# 这是将错误与用户关联起来的核心
sentry_sdk.set_user({"id": user_data["id"], "email": user_data["email"]})
# 2. 将用户数据放入 ContextVar
# 这样 LangChain 的任何部分都可以安全地访问它
user_context.set(user_data)
logger.info(f"Authenticated user found in session: {user_data['id']}")
return user_data
# 用户未认证,启动 SAML 登录流程
saml_req = prepare_saml_request(request)
auth = init_saml_auth(saml_req)
# RelayState 用于在登录后返回当前页面
redirect_url = auth.login(return_to=request.url.path)
return RedirectResponse(url=redirect_url)
@app.get("/api/ask")
async def ask_question(question: str, user: Dict = Depends(get_current_user)):
"""
受保护的 LangChain API 端点
"""
# 在这里,我们可以通过 contextvars 获取用户信息,而无需从函数参数中传递
current_user = user_context.get()
logger.info(f"User {current_user['id']} is asking a question: '{question}'")
try:
# 假设我们有一个 LangChain agent
# response = await agent.arun(question)
# 模拟一个可能出错的操作
if "error" in question.lower():
raise ValueError("Simulated error in LangChain agent execution.")
response = f"Answer for '{question}' by user {current_user.get('id')}"
# Sentry breadcrumb: 记录操作路径
sentry_sdk.add_breadcrumb(
category='langchain.call',
message=f'Successfully processed question from {current_user.get("id")}',
level='info',
data={'question': question, 'response_length': len(response)}
)
return {"answer": response}
except Exception as e:
# Sentry 会自动捕获未处理的异常。
# 由于我们已经在 get_current_user 中设置了 user scope,
# Sentry 报告将自动关联到当前用户。
logger.error(f"Error processing question for user {current_user['id']}: {e}", exc_info=True)
# 手动设置一些 LangChain 相关的 tag,便于在 Sentry 中筛选
sentry_sdk.set_tag("langchain.agent", "knowledge_base_agent")
sentry_sdk.set_tag("user_id", current_user['id'])
# 重新抛出异常,让 FastAPI 的异常处理中间件处理
raise
4. 在 LangChain 内部访问上下文
真正的挑战在于,如果 LangChain 的执行链非常复杂,包含多个异步工具调用,我们如何确保在最深处仍然能访问到用户信息?contextvars
就是为此而生的。
让我们创建一个自定义的 LangChain CallbackHandler,它能自动记录工具的使用情况,并将其作为 Sentry 的 breadcrumb
。
# main.py (续)
from langchain.callbacks.base import AsyncCallbackHandler
from typing import Any, Dict, List
from uuid import UUID
class SentryCallbackHandler(AsyncCallbackHandler):
"""
一个 LangChain 回调处理器,它能将 LangChain 的执行步骤记录为 Sentry 的面包屑。
"""
async def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
tags: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Any:
# 这里的代码在 LangChain 的异步执行环境中运行
# 但我们仍然可以访问到由 FastAPI 中间件设置的 context var
user = user_context.get()
user_id = user.get("id", "anonymous") if user else "anonymous"
sentry_sdk.add_breadcrumb(
category="langchain.tool.start",
message=f"Tool '{serialized.get('name', 'unknown_tool')}' started.",
level="info",
data={
"user_id": user_id,
"tool_input": input_str,
"run_id": str(run_id),
},
)
logger.info(f"[User: {user_id}] Tool start: {serialized.get('name')}")
async def on_tool_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: Optional[UUID] = None,
**kwargs: Any,
) -> Any:
user = user_context.get()
user_id = user.get("id", "anonymous") if user else "anonymous"
# 记录工具级别的错误,并附加上下文
sentry_sdk.capture_exception(error)
sentry_sdk.add_breadcrumb(
category="langchain.tool.error",
message=f"Tool execution failed for user {user_id}.",
level="error",
data={"run_id": str(run_id)},
)
logger.error(f"[User: {user_id}] Tool error: {error}", exc_info=True)
# ... 在创建 LangChain agent 时,将这个 handler 加入 callbacks ...
# from langchain.agents import initialize_agent, Tool
# sentry_handler = SentryCallbackHandler()
# agent = initialize_agent(
# tools=[...],
# llm=...,
# callbacks=[sentry_handler]
# )
现在,当我们的 LangChain Agent 执行时,每次调用工具,SentryCallbackHandler
都会被触发。由于 contextvars
的魔力,即使在 langchain
库的内部异步回调中,user_context.get()
也能正确地获取到在 FastAPI 依赖注入函数 get_current_user
中设置的用户信息。
当 ask_question
端点中 ValueError
被抛出时,Sentry 会创建一个 Issue。点开这个 Issue,你会看到:
- User: 明确标识了用户的 ID 和 email。
- Tags: 包含我们自定义的
langchain.agent
和user_id
标签。 - Breadcrumbs (面包屑): 按照时间顺序清晰地列出了请求的处理流程,包括
Authenticated user found...
,User ... is asking a question...
,以及由SentryCallbackHandler
记录的langchain.tool.start
等步骤。
这提供了一个完整的、带有丰富用户和业务上下文的错误快照,排查问题的起点从“一个未知错误发生了”变成了“用户 A 在调用工具 B 时,传入参数 C 后系统崩溃了”。
5. 完整的 FastAPI 应用示例
# main.py - 整合所有部分
import os
import logging
import contextvars
from typing import Optional, Dict, Any, List
from uuid import UUID
import sentry_sdk
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.responses import RedirectResponse
from starlette.middleware.sessions import SessionMiddleware
from onelogin.saml2.auth import OneLogin_Saml2_Auth
from langchain.callbacks.base import AsyncCallbackHandler
# --- 配置区 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
SENTRY_DSN = os.environ.get("SENTRY_DSN", None)
SESSION_SECRET_KEY = os.environ.get("SESSION_SECRET_KEY", "a_very_secret_key_for_development")
# --- Sentry 和 ContextVar 初始化 ---
if SENTRY_DSN:
sentry_sdk.init(dsn=SENTRY_DSN, traces_sample_rate=1.0)
logger.info("Sentry SDK initialized.")
user_context = contextvars.ContextVar("user_context", default=None)
# --- LangChain 自定义回调 ---
class SentryCallbackHandler(AsyncCallbackHandler):
"""一个 LangChain 回调处理器,它能将 LangChain 的执行步骤记录为 Sentry 的面包屑。"""
async def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, *,
run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any
) -> Any:
user = user_context.get()
user_id = user.get("id", "anonymous") if user else "anonymous"
sentry_sdk.add_breadcrumb(
category="langchain.tool.start",
message=f"Tool '{serialized.get('name', 'unknown_tool')}' started by {user_id}.",
level="info", data={"tool_input": input_str, "run_id": str(run_id)}
)
# ... on_tool_error, on_llm_start 等其他回调 ...
# --- FastAPI 应用 ---
app = FastAPI(title="Secure LangChain Service")
app.add_middleware(SessionMiddleware, secret_key=SESSION_SECRET_KEY)
# --- SAML 辅助函数 ---
def prepare_saml_request(request: Request) -> dict:
return {
'https': 'on' if request.url.scheme == 'https' else 'off',
'http_host': request.url.netloc,
'script_name': request.url.path,
'get_data': dict(request.query_params),
'post_data': {}
}
def init_saml_auth(saml_req: dict) -> OneLogin_Saml2_Auth:
# 避免 CWD 问题,这是 python3-saml 的一个已知问题
config_path = os.path.join(os.path.dirname(__file__), 'saml_config')
return OneLogin_Saml2_Auth(saml_req, custom_base_path=config_path)
# --- SAML 路由 ---
@app.post("/api/saml/acs", tags=["SAML"])
async def process_acs(request: Request):
"""Assertion Consumer Service (ACS)"""
req = prepare_saml_request(request)
form_data = await request.form()
req['post_data'] = dict(form_data)
auth = init_saml_auth(req)
auth.process_response()
errors = auth.get_errors()
if errors:
logger.error(f"SAML ACS Error: {errors}, Reason: {auth.get_last_error_reason()}")
raise HTTPException(status_code=401, detail=f"SAML Error: {auth.get_last_error_reason()}")
if not auth.is_authenticated():
raise HTTPException(status_code=401, detail="Not authenticated via SAML")
request.session['samlUserdata'] = auth.get_attributes()
request.session['samlNameId'] = auth.get_nameid()
logger.info(f"User {auth.get_nameid()} authenticated via SAML.")
relay_state = req['post_data'].get('RelayState', '/')
return RedirectResponse(url=relay_state)
# --- 安全依赖项 ---
async def get_current_user(request: Request) -> Dict:
if 'samlUserdata' in request.session:
user_data = {
"id": request.session.get('samlNameId'),
"email": request.session.get('samlNameId'),
"attributes": request.session.get('samlUserdata')
}
with sentry_sdk.configure_scope() as scope:
scope.user = {"id": user_data["id"], "email": user_data["email"]}
user_context.set(user_data)
return user_data
req = prepare_saml_request(request)
auth = init_saml_auth(req)
return RedirectResponse(url=auth.login(return_to=str(request.url)))
# --- 应用 API ---
@app.get("/api/ask", tags=["LangChain"])
async def ask_question(question: str, user: Dict = Depends(get_current_user)):
"""受保护的 LangChain API 端点,模拟执行并可能触发错误。"""
current_user_id = user['id']
sentry_sdk.set_tag("conversation_id", str(UUID(int=0))) # 示例: 关联会话ID
logger.info(f"User {current_user_id} asking: '{question}'")
sentry_sdk.add_breadcrumb(category='api.call', message=f'Question received from {current_user_id}.', level='info')
try:
# 在这里实例化并运行你的 LangChain Agent
# agent = initialize_agent(..., callbacks=[SentryCallbackHandler()])
# await agent.arun(question)
if "critical_error" in question.lower():
# 模拟一个未捕获的严重错误
1 / 0
response = f"This is a mocked answer for '{question}' from user {current_user_id}."
return {"answer": response, "user": current_user_id}
except Exception:
# 异常会被 Sentry 自动捕获,并且已包含用户上下文
logger.error(f"Error processing question for user {current_user_id}", exc_info=True)
# 抛出 HTTP 异常给客户端
raise HTTPException(status_code=500, detail="An internal error occurred in the LangChain agent.")
@app.get("/", tags=["Public"])
def read_root():
return {"message": "Welcome. Please access /api/ask to use the service."}
局限性与未来迭代
这个方案成功地将 SAML 认证、LangChain 核心逻辑和 Sentry 可观测性无缝地整合在一起,解决了我们最初的痛点。然而,它并非没有局限性:
- 会话管理: 当前的实现依赖于
starlette.middleware.sessions.SessionMiddleware
,它默认使用签名的 Cookie 来存储会话。在生产环境中,这不适用于多实例部署。一个更健壮的方案是使用一个集中的会话存储,如 Redis,并配合SessionMiddleware
的相应后端。 - SAML 配置硬编码: 将 SAML 配置放在 JSON 文件中不便于管理。在生产系统中,这些敏感信息应该通过配置中心或环境变量注入,特别是证书和私钥。
- 单点登录(SLO): 我们只实现了登录(SSO)流程。一个完整的 SAML 集成还应实现单点登出(SLO),处理来自 IdP 的登出请求,并清理本地会话。
- 分布式追踪: 当前的 Sentry 上下文仅限于单个服务内部。如果 LangChain Agent 会调用其他微服务(例如,一个专门的用户画像服务),那么需要引入 OpenTelemetry 并配置 Sentry 的分布式追踪,以将
trace_id
跨服务传递,从而获得真正的端到端可观测性。
尽管存在这些可迭代点,但这个架构为构建安全、可观测、企业级的 LangChain 应用提供了一个坚实的基础。通过 contextvars
这座桥梁,我们成功地打破了 Web 框架层和业务逻辑执行层之间的信息壁垒。