LangChain 企业级应用集成 SAML 身份认证与 Sentry 全链路可观测性实践


我们团队最近上线了一个基于 LangChain 的内部知识库问答机器人,底层使用 FastAPI 驱动。初版反响不错,但一个致命问题很快暴露出来:它是一个完全匿名的服务。所有请求看起来都一样,这在企业环境中引发了两个核心痛点:

  1. 访问控制缺失: 任何能访问内网的人都可以调用它的 API,我们无法基于用户的部门或角色进行权限控制。
  2. 问题排查困难: 当 LangChain 的执行链中某个环节(如调用 LLM、访问向量数据库)出错时,Sentry 捕获到的异常是匿名的。我们不知道是哪个用户的哪次会话导致了问题,也无法联系用户获取更多上下文,排查效率极低。

技术目标很明确:为这个 LangChain 服务集成公司现有的 SAML 2.0 单点登录(SSO)体系,并将认证后的用户身份信息贯穿整个请求链路,最终在 Sentry 的错误报告中清晰地呈现出来。这不仅仅是加一个登录页面那么简单,关键在于如何将 Web 层的身份上下文(SAML Assertion)安全、可靠地传递到 LangChain 的异步执行环境中,并与可观测性系统深度集成。

初步构想与技术选型

最初的想法是“在 FastAPI 上加个 SAML 中间件”。这个方向没错,但魔鬼在细节里。整个流程应该如下:

sequenceDiagram
    participant User as 用户
    participant Browser as 浏览器
    participant FastAPI as LangChain后端
    participant IdP as SAML IdP (如Okta, Azure AD)
    participant Sentry as Sentry平台

    User->>Browser: 访问受保护的问答页面
    Browser->>FastAPI: 发起请求 /api/ask
    FastAPI-->>Browser: 302 重定向至 IdP
    Browser->>IdP: 发起 SAML AuthnRequest
    IdP-->>User: 要求用户登录
    User->>IdP: 输入凭证
    IdP-->>Browser: 返回签名的 SAML Response (POST)
    Browser->>FastAPI: POST SAML Response至 /acs (Assertion Consumer Service)
    FastAPI->>FastAPI: 校验 SAML Response, 解析用户身份
    FastAPI->>FastAPI: 创建会话, 注入用户上下文
    FastAPI-->>Browser: 302 重定向回原始问答页面
    Browser->>FastAPI: 再次请求 /api/ask (携带会话Cookie)
    FastAPI->>FastAPI: 中间件校验会话, 设置Sentry Scope, 填充ContextVar
    Note over FastAPI: LangChain Agent 执行...
    FastAPI-->>Browser: 返回问答结果

    alt 发生异常
        FastAPI->>Sentry: 发送异常事件 (包含用户ID, email等)
    end

这个流程的核心在于,FastAPI 在 /acs 路由上完成认证后,如何将用户身份(比如 user_id, email)与后续的 API 请求关联起来。

技术栈选择:

  • Web 框架: FastAPI。其原生的 async/await 支持与 LangChain 的异步执行链完美契合,这是关键。
  • SAML 库: python3-saml。这是一个成熟且功能完备的库,虽然配置稍显复杂,但稳定性有保障。
  • 上下文传递: contextvars。Python 标准库,专门用于在异步任务中安全地传递上下文信息,避免了在每个函数签名中手动传递 user 对象的窘境。
  • 错误监控: sentry-sdk。Sentry 官方 Python SDK,提供了丰富的上下文管理 API。

步骤化实现:从认证到可观测性闭环

1. SAML 基础配置与 FastAPI 集成

第一步是让 FastAPI 能够处理 SAML 流程。这需要配置 IdP(身份提供商)和 SP(服务提供商,即我们的 FastAPI 应用)的信息。

假设我们的 FastAPI 应用运行在 http://localhost:8000。我们需要一个地方存放 SAML 配置。

文件结构:

.
├── saml_config/
│   ├── advanced_settings.json
│   ├── settings.json
│   ├── idp_metadata.xml  # 从IdP处下载
├── main.py
└── requirements.txt

saml_config/settings.json (SP 配置):

{
    "strict": true,
    "debug": true,
    "sp": {
        "entityId": "my-langchain-app-sp",
        "assertionConsumerService": {
            "url": "http://localhost:8000/api/saml/acs",
            "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"
        },
        "singleLogoutService": {
            "url": "http://localhost:8000/api/saml/sls",
            "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
        },
        "nameIdFormat": "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress"
    },
    "idp": {
        "entityId": "http://www.okta.com/exkabcdefg1234567890", // 从 IdP metadata 中获取
        "singleSignOnService": {
            "url": "https://your-idp.okta.com/app/your-app/sso/saml", // 从 IdP metadata 中获取
            "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
        },
        "singleLogoutService": {
            "url": "https://your-idp.okta.com/app/your-app/slo/saml", // 从 IdP metadata 中获取
            "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
        },
        "x509cert": "MIIDq...your...cert...END CERTIFICATE-----\n" // 从 IdP metadata 中获取
    }
}

saml_config/advanced_settings.json (安全配置):

{
    "security": {
        "nameIdEncrypted": false,
        "authnRequestsSigned": false,
        "logoutRequestSigned": false,
        "logoutResponseSigned": false,
        "signMetadata": false,
        "wantMessagesSigned": false,
        "wantAssertionsSigned": true, // 关键:必须要求断言是签名的
        "wantAssertionsEncrypted": false,
        "wantNameId": true,
        "wantNameIdEncrypted": false,
        "requestedAuthnContext": true,
        "signatureAlgorithm": "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256",
        "digestAlgorithm": "http://www.w3.org/2001/04/xmlenc#sha256"
    }
}

在真实项目中,authnRequestsSigned 等都应设为 true,并提供 SP 的私钥和证书。

2. 编写 SAML 工具类和 FastAPI 路由

为了代码整洁,我们把 python3-saml 的初始化逻辑封装起来。

# main.py (部分代码)
import os
import logging
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.responses import RedirectResponse
from starlette.middleware.sessions import SessionMiddleware
from onelogin.saml2.auth import OneLogin_Saml2_Auth
from onelogin.saml2.utils import OneLogin_Saml2_Utils

# ... Sentry 和 LangChain 的 imports ...

# 日志配置
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI()
# SessionMiddleware 是必需的,用于在 SAML 重定向后保持用户状态
app.add_middleware(SessionMiddleware, secret_key="a_very_secret_key_for_dev")

def prepare_saml_request(request: Request):
    """
    根据 FastAPI 的请求对象准备 SAML 请求,主要是构建 URL。
    """
    return {
        'https': 'on' if request.url.scheme == 'https' else 'off',
        'http_host': request.url.netloc,
        'script_name': request.url.path,
        'get_data': request.query_params,
        'post_data': {} # 将在路由中填充
    }

def init_saml_auth(saml_req: dict):
    """
    初始化 SAML Auth 对象。
    这里的坑在于:`python3-saml` 库的配置加载不是线程安全的,
    但在 FastAPI 这种 ASGI 应用中,每次请求都初始化是安全的。
    """
    old_cwd = os.getcwd()
    try:
        # 库的路径加载有 bug,需要切换工作目录
        config_path = os.path.join(os.path.dirname(__file__), 'saml_config')
        os.chdir(config_path)
        auth = OneLogin_Saml2_Auth(saml_req, old_settings_path=os.path.join(config_path))
    finally:
        os.chdir(old_cwd)
    return auth

@app.post("/api/saml/acs")
async def process_acs(request: Request):
    """
    Assertion Consumer Service: SAML IdP 回调的端点
    """
    req = prepare_saml_request(request)
    form_data = await request.form()
    req['post_data'] = {k: v for k, v in form_data.items()}

    auth = init_saml_auth(req)
    auth.process_response()
    errors = auth.get_errors()

    if errors:
        logger.error(f"SAML ACS Error: {errors}, Reason: {auth.get_last_error_reason()}")
        raise HTTPException(status_code=401, detail=f"SAML Error: {auth.get_last_error_reason()}")

    if not auth.is_authenticated():
        logger.warning("SAML ACS: Not authenticated.")
        raise HTTPException(status_code=401, detail="Not authenticated")

    # 认证成功,将用户信息存入 session
    request.session['samlUserdata'] = auth.get_attributes()
    request.session['samlNameId'] = auth.get_nameid()
    request.session['samlSessionIndex'] = auth.get_session_index()
    logger.info(f"User {auth.get_nameid()} successfully authenticated.")

    # 重定向到用户最初想访问的页面
    relay_state = req['post_data'].get('RelayState', '/')
    return RedirectResponse(url=relay_state)

3. 保护 API 端点并传递上下文

现在,我们需要一个依赖项或中间件来保护我们的 LangChain API,并在这里把用户身份注入到请求上下文中。这里是 contextvars 发挥作用的地方。

# main.py (续)
import contextvars
from typing import Optional, Dict

# 创建一个 ContextVar,用于在异步任务中安全地存储用户信息
# 这是一个关键步骤,它解决了如何在 LangChain 的深层调用中获取用户信息的问题
user_context = contextvars.ContextVar("user_context", default=None)

# Sentry SDK 初始化
import sentry_sdk

sentry_sdk.init(
    dsn="YOUR_SENTRY_DSN",
    # 启用性能监控
    traces_sample_rate=1.0,
)

async def get_current_user(request: Request) -> Dict:
    """
    FastAPI 依赖项,用于保护路由。
    它会检查 session,如果未认证,则重定向到 SAML 登录流程。
    认证成功后,它会设置 Sentry scope 和 contextvars。
    """
    if 'samlUserdata' in request.session:
        user_data = {
            "id": request.session.get('samlNameId'),
            "email": request.session.get('samlNameId'), # 假设 NameId 就是 email
            "attributes": request.session.get('samlUserdata')
        }
        
        # 1. 设置 Sentry User Scope
        # 这是将错误与用户关联起来的核心
        sentry_sdk.set_user({"id": user_data["id"], "email": user_data["email"]})
        
        # 2. 将用户数据放入 ContextVar
        # 这样 LangChain 的任何部分都可以安全地访问它
        user_context.set(user_data)
        
        logger.info(f"Authenticated user found in session: {user_data['id']}")
        return user_data

    # 用户未认证,启动 SAML 登录流程
    saml_req = prepare_saml_request(request)
    auth = init_saml_auth(saml_req)
    # RelayState 用于在登录后返回当前页面
    redirect_url = auth.login(return_to=request.url.path)
    return RedirectResponse(url=redirect_url)

@app.get("/api/ask")
async def ask_question(question: str, user: Dict = Depends(get_current_user)):
    """
    受保护的 LangChain API 端点
    """
    # 在这里,我们可以通过 contextvars 获取用户信息,而无需从函数参数中传递
    current_user = user_context.get()
    logger.info(f"User {current_user['id']} is asking a question: '{question}'")

    try:
        # 假设我们有一个 LangChain agent
        # response = await agent.arun(question)
        # 模拟一个可能出错的操作
        if "error" in question.lower():
            raise ValueError("Simulated error in LangChain agent execution.")
        
        response = f"Answer for '{question}' by user {current_user.get('id')}"

        # Sentry breadcrumb: 记录操作路径
        sentry_sdk.add_breadcrumb(
            category='langchain.call',
            message=f'Successfully processed question from {current_user.get("id")}',
            level='info',
            data={'question': question, 'response_length': len(response)}
        )
        return {"answer": response}

    except Exception as e:
        # Sentry 会自动捕获未处理的异常。
        # 由于我们已经在 get_current_user 中设置了 user scope,
        # Sentry 报告将自动关联到当前用户。
        logger.error(f"Error processing question for user {current_user['id']}: {e}", exc_info=True)
        # 手动设置一些 LangChain 相关的 tag,便于在 Sentry 中筛选
        sentry_sdk.set_tag("langchain.agent", "knowledge_base_agent")
        sentry_sdk.set_tag("user_id", current_user['id'])
        # 重新抛出异常,让 FastAPI 的异常处理中间件处理
        raise

4. 在 LangChain 内部访问上下文

真正的挑战在于,如果 LangChain 的执行链非常复杂,包含多个异步工具调用,我们如何确保在最深处仍然能访问到用户信息?contextvars 就是为此而生的。

让我们创建一个自定义的 LangChain CallbackHandler,它能自动记录工具的使用情况,并将其作为 Sentry 的 breadcrumb

# main.py (续)
from langchain.callbacks.base import AsyncCallbackHandler
from typing import Any, Dict, List
from uuid import UUID

class SentryCallbackHandler(AsyncCallbackHandler):
    """
    一个 LangChain 回调处理器,它能将 LangChain 的执行步骤记录为 Sentry 的面包屑。
    """
    async def on_tool_start(
        self,
        serialized: Dict[str, Any],
        input_str: str,
        *,
        run_id: UUID,
        parent_run_id: Optional[UUID] = None,
        tags: Optional[List[str]] = None,
        metadata: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Any:
        # 这里的代码在 LangChain 的异步执行环境中运行
        # 但我们仍然可以访问到由 FastAPI 中间件设置的 context var
        user = user_context.get()
        user_id = user.get("id", "anonymous") if user else "anonymous"

        sentry_sdk.add_breadcrumb(
            category="langchain.tool.start",
            message=f"Tool '{serialized.get('name', 'unknown_tool')}' started.",
            level="info",
            data={
                "user_id": user_id,
                "tool_input": input_str,
                "run_id": str(run_id),
            },
        )
        logger.info(f"[User: {user_id}] Tool start: {serialized.get('name')}")

    async def on_tool_error(
        self,
        error: BaseException,
        *,
        run_id: UUID,
        parent_run_id: Optional[UUID] = None,
        **kwargs: Any,
    ) -> Any:
        user = user_context.get()
        user_id = user.get("id", "anonymous") if user else "anonymous"
        
        # 记录工具级别的错误,并附加上下文
        sentry_sdk.capture_exception(error)
        sentry_sdk.add_breadcrumb(
            category="langchain.tool.error",
            message=f"Tool execution failed for user {user_id}.",
            level="error",
            data={"run_id": str(run_id)},
        )
        logger.error(f"[User: {user_id}] Tool error: {error}", exc_info=True)

# ... 在创建 LangChain agent 时,将这个 handler 加入 callbacks ...
# from langchain.agents import initialize_agent, Tool
# sentry_handler = SentryCallbackHandler()
# agent = initialize_agent(
#     tools=[...],
#     llm=...,
#     callbacks=[sentry_handler]
# )

现在,当我们的 LangChain Agent 执行时,每次调用工具,SentryCallbackHandler 都会被触发。由于 contextvars 的魔力,即使在 langchain 库的内部异步回调中,user_context.get() 也能正确地获取到在 FastAPI 依赖注入函数 get_current_user 中设置的用户信息。

ask_question 端点中 ValueError 被抛出时,Sentry 会创建一个 Issue。点开这个 Issue,你会看到:

  1. User: 明确标识了用户的 ID 和 email。
  2. Tags: 包含我们自定义的 langchain.agentuser_id 标签。
  3. Breadcrumbs (面包屑): 按照时间顺序清晰地列出了请求的处理流程,包括 Authenticated user found...User ... is asking a question...,以及由 SentryCallbackHandler 记录的 langchain.tool.start 等步骤。

这提供了一个完整的、带有丰富用户和业务上下文的错误快照,排查问题的起点从“一个未知错误发生了”变成了“用户 A 在调用工具 B 时,传入参数 C 后系统崩溃了”。

5. 完整的 FastAPI 应用示例

# main.py - 整合所有部分

import os
import logging
import contextvars
from typing import Optional, Dict, Any, List
from uuid import UUID

import sentry_sdk
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.responses import RedirectResponse
from starlette.middleware.sessions import SessionMiddleware
from onelogin.saml2.auth import OneLogin_Saml2_Auth
from langchain.callbacks.base import AsyncCallbackHandler

# --- 配置区 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

SENTRY_DSN = os.environ.get("SENTRY_DSN", None)
SESSION_SECRET_KEY = os.environ.get("SESSION_SECRET_KEY", "a_very_secret_key_for_development")

# --- Sentry 和 ContextVar 初始化 ---
if SENTRY_DSN:
    sentry_sdk.init(dsn=SENTRY_DSN, traces_sample_rate=1.0)
    logger.info("Sentry SDK initialized.")

user_context = contextvars.ContextVar("user_context", default=None)

# --- LangChain 自定义回调 ---
class SentryCallbackHandler(AsyncCallbackHandler):
    """一个 LangChain 回调处理器,它能将 LangChain 的执行步骤记录为 Sentry 的面包屑。"""
    async def on_tool_start(
        self, serialized: Dict[str, Any], input_str: str, *,
        run_id: UUID, parent_run_id: Optional[UUID] = None, **kwargs: Any
    ) -> Any:
        user = user_context.get()
        user_id = user.get("id", "anonymous") if user else "anonymous"
        sentry_sdk.add_breadcrumb(
            category="langchain.tool.start",
            message=f"Tool '{serialized.get('name', 'unknown_tool')}' started by {user_id}.",
            level="info", data={"tool_input": input_str, "run_id": str(run_id)}
        )
    # ... on_tool_error, on_llm_start 等其他回调 ...

# --- FastAPI 应用 ---
app = FastAPI(title="Secure LangChain Service")
app.add_middleware(SessionMiddleware, secret_key=SESSION_SECRET_KEY)


# --- SAML 辅助函数 ---
def prepare_saml_request(request: Request) -> dict:
    return {
        'https': 'on' if request.url.scheme == 'https' else 'off',
        'http_host': request.url.netloc,
        'script_name': request.url.path,
        'get_data': dict(request.query_params),
        'post_data': {}
    }

def init_saml_auth(saml_req: dict) -> OneLogin_Saml2_Auth:
    # 避免 CWD 问题,这是 python3-saml 的一个已知问题
    config_path = os.path.join(os.path.dirname(__file__), 'saml_config')
    return OneLogin_Saml2_Auth(saml_req, custom_base_path=config_path)


# --- SAML 路由 ---
@app.post("/api/saml/acs", tags=["SAML"])
async def process_acs(request: Request):
    """Assertion Consumer Service (ACS)"""
    req = prepare_saml_request(request)
    form_data = await request.form()
    req['post_data'] = dict(form_data)
    
    auth = init_saml_auth(req)
    auth.process_response()
    errors = auth.get_errors()

    if errors:
        logger.error(f"SAML ACS Error: {errors}, Reason: {auth.get_last_error_reason()}")
        raise HTTPException(status_code=401, detail=f"SAML Error: {auth.get_last_error_reason()}")

    if not auth.is_authenticated():
        raise HTTPException(status_code=401, detail="Not authenticated via SAML")

    request.session['samlUserdata'] = auth.get_attributes()
    request.session['samlNameId'] = auth.get_nameid()
    
    logger.info(f"User {auth.get_nameid()} authenticated via SAML.")
    relay_state = req['post_data'].get('RelayState', '/')
    return RedirectResponse(url=relay_state)


# --- 安全依赖项 ---
async def get_current_user(request: Request) -> Dict:
    if 'samlUserdata' in request.session:
        user_data = {
            "id": request.session.get('samlNameId'),
            "email": request.session.get('samlNameId'),
            "attributes": request.session.get('samlUserdata')
        }
        
        with sentry_sdk.configure_scope() as scope:
            scope.user = {"id": user_data["id"], "email": user_data["email"]}
        
        user_context.set(user_data)
        return user_data

    req = prepare_saml_request(request)
    auth = init_saml_auth(req)
    return RedirectResponse(url=auth.login(return_to=str(request.url)))


# --- 应用 API ---
@app.get("/api/ask", tags=["LangChain"])
async def ask_question(question: str, user: Dict = Depends(get_current_user)):
    """受保护的 LangChain API 端点,模拟执行并可能触发错误。"""
    current_user_id = user['id']
    sentry_sdk.set_tag("conversation_id", str(UUID(int=0))) # 示例: 关联会话ID

    logger.info(f"User {current_user_id} asking: '{question}'")
    sentry_sdk.add_breadcrumb(category='api.call', message=f'Question received from {current_user_id}.', level='info')

    try:
        # 在这里实例化并运行你的 LangChain Agent
        # agent = initialize_agent(..., callbacks=[SentryCallbackHandler()])
        # await agent.arun(question)
        
        if "critical_error" in question.lower():
            # 模拟一个未捕获的严重错误
            1 / 0
        
        response = f"This is a mocked answer for '{question}' from user {current_user_id}."
        return {"answer": response, "user": current_user_id}
        
    except Exception:
        # 异常会被 Sentry 自动捕获,并且已包含用户上下文
        logger.error(f"Error processing question for user {current_user_id}", exc_info=True)
        # 抛出 HTTP 异常给客户端
        raise HTTPException(status_code=500, detail="An internal error occurred in the LangChain agent.")

@app.get("/", tags=["Public"])
def read_root():
    return {"message": "Welcome. Please access /api/ask to use the service."}

局限性与未来迭代

这个方案成功地将 SAML 认证、LangChain 核心逻辑和 Sentry 可观测性无缝地整合在一起,解决了我们最初的痛点。然而,它并非没有局限性:

  1. 会话管理: 当前的实现依赖于 starlette.middleware.sessions.SessionMiddleware,它默认使用签名的 Cookie 来存储会话。在生产环境中,这不适用于多实例部署。一个更健壮的方案是使用一个集中的会话存储,如 Redis,并配合 SessionMiddleware 的相应后端。
  2. SAML 配置硬编码: 将 SAML 配置放在 JSON 文件中不便于管理。在生产系统中,这些敏感信息应该通过配置中心或环境变量注入,特别是证书和私钥。
  3. 单点登录(SLO): 我们只实现了登录(SSO)流程。一个完整的 SAML 集成还应实现单点登出(SLO),处理来自 IdP 的登出请求,并清理本地会话。
  4. 分布式追踪: 当前的 Sentry 上下文仅限于单个服务内部。如果 LangChain Agent 会调用其他微服务(例如,一个专门的用户画像服务),那么需要引入 OpenTelemetry 并配置 Sentry 的分布式追踪,以将 trace_id 跨服务传递,从而获得真正的端到端可观测性。

尽管存在这些可迭代点,但这个架构为构建安全、可观测、企业级的 LangChain 应用提供了一个坚实的基础。通过 contextvars 这座桥梁,我们成功地打破了 Web 框架层和业务逻辑执行层之间的信息壁垒。


  目录