主题
8.5 完整评估工作流与生产实践
从实验室到生产线:评估体系的最后一公里
前面四节我们分别学习了 RAG 评估的整体框架、检索质量评估、生成质量评估、以及调试与可观测性技术。这些知识像散落的珍珠,现在需要一根线把它们串成一条完整的项链。这一节要解决的核心问题是:如何把评估能力从"开发时手动跑一次"升级为"贯穿整个生命周期的自动化体系"?
想象一下这样的场景:你的企业知识库系统已经上线运行了三个月,某天产品经理告诉你最近用户反馈回答质量下降了。你该怎么办?手动挑几个问题测试一下?那太不靠谱了。正确的做法是:你有一个持续运行的评估管道(Pipeline),每天自动用最新的测试集跑一遍完整评估,生成报告推送到钉钉/Slack,并且一旦某个指标跌破阈值就立刻告警。这就是本节要构建的东西。
评估全生命周期:五个阶段
一个成熟的 RAG 评估体系不是一次性的事情,而是覆盖从开发到上线再到运维的完整生命周期。我们可以把它划分为五个阶段:
┌─────────────────────────────────────────────────────────────┐
│ RAG 评估全生命周期 │
│ │
│ 阶段1: 基线建立 阶段2: 迭代优化 阶段3: 回归防护 │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ │ 构建初始 │→→│ 每次改动 │→→│ 自动化回归 │ │
│ │ 测试集 │ │ 跑评估 │ │ 测试 │ │
│ │ 跑基线 │ │ 对比差异 │ │ 防止退化 │ │
│ └───────────┘ └───────────┘ └───────────┘ │
│ ↓ ↓ ↓ │
│ 阶段4: 生产监控 阶段5: 持续改进 │
│ ┌───────────┐ ┌───────────┐ │
│ │ 在线采样 │→→│ 定期回顾 │ │
│ │ 实时反馈 │ │ 更新测试集 │ │
│ │ 异常告警 │ │ 优化策略 │ │
│ └───────────┘ └───────────┘ │
└─────────────────────────────────────────────────────────────┘阶段一:基线建立
基线建立是所有后续工作的参照系。没有基线,你就不知道"好"的标准是什么。
python
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core.evaluation import (
DatasetGenerator,
RetrieverEvaluator,
BatchEvalRunner,
FaithfulnessEvaluator,
RelevancyEvaluator,
)
from llama_index.llms.openai import OpenAI
import json
from pathlib import Path
from datetime import datetime
class BaselineBuilder:
"""构建评估基线的完整流程"""
def __init__(self, data_dir: str, output_dir: str = "./eval_baselines"):
self.data_dir = data_dir
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
def build_baseline(self, num_questions: int = 50):
"""完整的基线建立流程"""
print("=== 阶段1: 加载数据并构建索引 ===")
documents = SimpleDirectoryReader(self.data_dir).load_data()
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine(similarity_top_k=3)
print("=== 阶段2: 生成评估数据集 ===")
dataset_generator = DatasetGenerator.from_documents(
documents,
llm=OpenAI(model="gpt-4o", temperature=0.7),
num_questions_per_doc=3,
question_gen_query=(
"请根据以下文档内容,生成能够测试RAG系统能力的多样化问题。"
"包括:事实性问题、推理问题、对比问题和细节追问。"
),
)
eval_questions = dataset_generator.generate_questions_from_nodes(
num=num_questions
)
print(f"生成了 {len(eval_questions)} 个评估问题")
print("=== 阶段3: 运行全面评估 ===")
faithfulness_evaluator = FaithfulnessEvaluator(
llm=OpenAI(model="gpt-4o")
)
relevancy_evaluator = RelevancyEvaluator(
llm=OpenAI(model="gpt-4o")
)
runner = BatchEvalRunner(
{
"faithfulness": faithfulness_evaluator,
"relevancy": relevancy_evaluator,
},
workers=8,
)
eval_results = await runner.aevaluate_queries(
query_engine, queries=eval_questions
)
print("=== 阶段4: 计算检索指标 ===")
retriever = index.as_retriever(similarity_top_k=5)
retriever_evaluator = RetrieverEvaluator.from_metric_names(
["mrr", "hit_rate"], retriever=retriever
)
retrieval_results = await retriever_evaluator.aevaluate_dataset(
eval_questions, show_progress=True
)
print("=== 阶段5: 保存基线结果 ===")
baseline_data = {
"timestamp": datetime.now().isoformat(),
"config": {
"data_dir": str(self.data_dir),
"num_questions": num_questions,
"similarity_top_k": 3,
"llm_model": "gpt-4o",
},
"generation_metrics": {
"faithfulness": eval_results["faithfulness"].get_average_score(),
"relevancy": eval_results["relevancy"].get_average_score(),
},
"retrieval_metrics": {
"mrr": retrieval_results.get_average_score("mrr"),
"hit_rate": retrieval_results.get_average_score("hit_rate"),
},
"per_question_details": self._extract_per_question_details(
eval_results, retrieval_results, eval_questions
),
}
baseline_file = self.output_dir / f"baseline_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(baseline_file, "w", encoding="utf-8") as f:
json.dump(baseline_data, f, ensure_ascii=False, indent=2, default=str)
latest_file = self.output_dir / "baseline_latest.json"
with open(latest_file, "w", encoding="utf-8") as f:
json.dump(baseline_data, f, ensure_ascii=False, indent=2, default=str)
self._print_summary(baseline_data)
return baseline_data
def _extract_per_question_details(self, gen_results, ret_results, questions):
details = []
for i, q in enumerate(questions):
detail = {
"question": q,
"faithfulness": gen_results["faithfulness"].results[i].score if i < len(gen_results["faithfulness"].results) else None,
"relevancy": gen_results["relevancy"].results[i].score if i < len(gen_results["relevancy"].results) else None,
"mrr": ret_results.results[i].metrics.get("mrr") if i < len(ret_results.results) else None,
"hit_rate": ret_results.results[i].metrics.get("hit_rate") if i < len(ret_results.results) else None,
}
details.append(detail)
return details
def _print_summary(self, baseline_data):
print("\n" + "=" * 60)
print(f"📊 基线评估完成!时间戳: {baseline_data['timestamp']}")
print("=" * 60)
print("\n【生成质量指标】")
print(f" 忠实度 (Faithfulness): {baseline_data['generation_metrics']['faithfulness']:.4f}")
print(f" 相关性 (Relevancy): {baseline_data['generation_metrics']['relevancy']:.4f}")
print("\n【检索质量指标】")
print(f" MRR: {baseline_data['retrieval_metrics']['mrr']:.4f}")
print(f" Hit Rate: {baseline_data['retrieval_metrics']['hit_rate']:.4f}")
print("=" * 60)
# 使用示例
builder = BaselineBuilder("./data/company_kb")
baseline = builder.build_baseline(num_questions=50)这个 BaselineBuilder 类做了五件事:加载数据构建索引、用 LLM 自动生成评估问题、同时跑生成质量和检索质量的批量评估、保存结果到 JSON 文件、打印摘要报告。其中有一个容易被忽略的细节:每个问题的详细分数也被保存了下来,而不仅仅是平均值。为什么这很重要?因为平均值会掩盖个体差异——比如你可能整体 Faithfulness 是 0.85 看起来不错,但其中有 10% 的问题得分低于 0.5,这些"长尾坏案例"恰恰是最需要关注的。
阶段二:迭代优化中的对比评估
当你修改了 chunking 策略、换了 embedding 模型、或者调整了 reranker 的参数之后,你需要知道这些改动到底是变好了还是变坏了。这就需要做 A/B 对比评估:
python
import json
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class ComparisonResult(Enum):
IMPROVED = "improved"
DEGRADED = "degraded"
NEUTRAL = "neutral"
@dataclass
class MetricDelta:
metric_name: str
old_value: float
new_value: float
delta: float
delta_pct: float
result: ComparisonResult
significance: bool
class EvaluationComparator:
"""对比两次评估结果的工具"""
def __init__(
self,
baseline_path: str,
threshold_pct: float = 2.0,
strict_mode: bool = False,
):
self.baseline_path = Path(baseline_path)
self.threshold_pct = threshold_pct
self.strict_mode = strict_mode
def load_baseline(self):
with open(self.baseline_path, "r", encoding="utf-8") as f:
return json.load(f)
def compare(self, new_results: dict, baseline: Optional[dict] = None) -> dict:
if baseline is None:
baseline = self.load_baseline()
report = {
"comparison_time": datetime.now().isoformat(),
"baseline_timestamp": baseline.get("timestamp", "unknown"),
"threshold_pct": self.threshold_pct,
"summary": {"improved": [], "degraded": [], "neutral": []},
"deltas": [],
"verdict": None,
"recommendations": [],
}
gen_deltas = self._compare_metric_group(
baseline["generation_metrics"],
new_results.get("generation_metrics", {}),
"generation",
)
report["deltas"].extend(gen_deltas)
ret_deltas = self._compare_metric_group(
baseline["retrieval_metrics"],
new_results.get("retrieval_metrics", {}),
"retrieval",
)
report["deltas"].extend(ret_deltas)
for d in report["deltas"]:
if d.result == ComparisonResult.IMPROVED:
report["summary"]["improved"].append(d.metric_name)
elif d.result == ComparisonResult.DEGRADED:
report["summary"]["degraded"].append(d.metric_name)
else:
report["summary"]["neutral"].append(d.metric_name)
report["verdict"] = self._make_verdict(report)
report["recommendations"] = self._generate_recommendations(report)
self._print_comparison_report(report)
return report
def _compare_metric_group(
self, old_metrics: dict, new_metrics: dict, group_name: str
) -> list:
deltas = []
for name, old_val in old_metrics.items():
if name not in new_metrics:
continue
new_val = new_metrics[name]
delta = new_val - old_val
delta_pct = (delta / old_val * 100) if old_val != 0 else 0
if abs(delta_pct) >= self.threshold_pct:
if delta > 0:
result = ComparisonResult.IMPROVED
significance = True
elif self.strict_mode:
result = ComparisonResult.DEGRADED
significance = True
else:
result = ComparisonResult.DEGRADED
significance = True
else:
result = ComparisonResult.NEUTRAL
significance = False
deltas.append(
MetricDelta(
metric_name=f"{group_name}.{name}",
old_value=old_val,
new_value=new_val,
delta=delta,
delta_pct=delta_pct,
result=result,
significance=significance,
)
)
return deltas
def _make_verdict(self, report: dict) -> str:
degraded_count = len(report["summary"]["degraded"])
improved_count = len(report["summary"]["improved"])
if degraded_count == 0:
if improved_count > 0:
return "✅ PASS — 所有指标持平或提升,可以发布"
else:
return "⚪ NEUTRAL — 无显著变化,风险可控"
elif degraded_count <= 1 and improved_count >= 2:
return "⚠️ CONDITIONAL — 有少量退化但整体改善明显,建议人工审核后决定"
else:
return "❌ FAIL — 存在显著退化,不建议发布"
def _generate_recommendations(self, report: dict) -> list:
recommendations = []
for d in report["deltas"]:
if d.result == ComparisonResult.DEGRADED:
if "faithfulness" in d.metric_name:
recommendations.append(
f"🔴 {d.metric_name} 下降了 {abs(d.delta_pct):.1f}% "
f"({d.old_value:.4f} → {d.new_value:.4f})。"
f"建议检查:(1) 检索到的上下文是否包含足够信息;"
f"(2) Prompt是否引导模型基于上下文作答;"
f"(3) 是否需要增加reranking来过滤噪声节点。"
)
elif "relevancy" in d.metric_name:
recommendations.append(
f"🔴 {d.metric_name} 下降了 {abs(d.delta_pct):.1f}%。"
f"建议检查:(1) top_k是否过小导致相关内容被截断;"
f"(2) embedding模型是否匹配当前语言/领域;"
f"(3) 是否需要启用Hybrid Search补充关键词召回。"
)
elif "mrr" in d.metric_name or "hit_rate" in d.metric_name:
recommendations.append(
f"🔴 {d.metric_name} 下降了 {abs(d.delta_pct):.1f}%。"
f"建议检查:(1) 数据是否有更新导致embedding过期;"
f"(2) chunk_size是否合适(过大降低精度);"
f"(3) 是否有新的查询模式未被现有索引覆盖。"
)
elif d.result == ComparisonResult.IMPROVED and d.significance:
recommendations.append(
f"🟢 {d.metric_name} 提升了 {d.delta_pct:.1f}% "
f"({d.old_value:.4f} → {d.new_value:.4f}) ✨"
)
return recommendations
def _print_comparison_report(self, report: dict):
print("\n" + "=" * 70)
print("📋 评估对比报告")
print("=" * 70)
print(f"\n基线时间: {report['baseline_timestamp']}")
print(f"阈值设置: ±{self.threshold_pct}% 为显著变化\n")
print(f"{'指标':<30} {'基线值':>10} {'新值':>10} {'变化率':>10} {'状态':>10}")
print("-" * 70)
for d in report["deltas"]:
status_icon = {
ComparisonResult.IMPROVED: "🟢↑",
ComparisonResult.DEGRADED: "🔴↓",
ComparisonResult.NEUTRAL: "⚪→",
}[d.result]
print(
f"{d.metric_name:<30} {d.old_value:>10.4f} {d.new_value:>10.4f} "
f"{d.delta_pct:>+9.1f}% {status_icon:>10}"
)
print("\n" + "-" * 70)
print(f"\n📊 总体判定: {report['verdict']}")
if report["recommendations"]:
print("\n💡 建议:")
for rec in report["recommendations"]:
print(f" {rec}")
print("=" * 70)
# 使用示例
comparator = EvaluationComparator(
baseline_path="./eval_baselines/baseline_latest.json",
threshold_pct=2.0,
strict_mode=True,
)
# report = comparator.compare(new_results)这个对比器的核心设计思路是:不只是告诉你是好是坏,还要告诉你具体哪里变了、变化了多少、以及应该怎么修。注意其中的 _make_verdict 方法——它不是简单地看有没有退化的指标,而是做了分级判断:零退化直接通过、少量退化但大量提升则条件通过(需要人工审核)、多维度退化则拒绝。这种判断逻辑在实际工程中非常实用,因为现实中的改动往往是有得有失的,一刀切的"有任何退化就不让上线"会导致团队不敢做任何尝试。
阶段三:回归防护——CI/CD 集成
当评估能力成熟之后,下一步就是把它嵌入到 CI/CD 流水线中,实现每次代码提交或合并请求都自动跑评估。下面是一个完整的 GitHub Actions 工作流配置:
yaml
# .github/workflows/rag-evaluation.yml
name: RAG Evaluation Pipeline
on:
pull_request:
branches: [main]
push:
branches: [main]
paths:
- "rag_pipeline/**"
- "evaluation/**"
- "data/**"
jobs:
evaluate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
cache: "pip"
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install llama-index llama-index-embeddings-openai
pip install llama-index-readers-file llama-index-vector-stores-chroma
pip install ragas openai datasets
- name: Load environment variables
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: |
echo "OPENAI_API_KEY=$OPENAI_API_KEY" >> $GITHUB_ENV
- name: Run baseline comparison
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: |
python evaluation/run_evaluation.py \
--data-dir ./data/knowledge_base \
--baseline ./eval_baselines/baseline_latest.json \
--output ./eval_results \
--threshold 2.0 \
--strict
- name: Upload evaluation results
uses: actions/upload-artifact@v4
with:
name: evaluation-results
path: ./eval_results/
retention-days: 30
- name: Comment PR with results
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const resultPath = './eval_results/comparison_report.json';
if (!fs.existsSync(resultPath)) {
console.log('No evaluation results found');
return;
}
const report = JSON.parse(fs.readFileSync(resultPath, 'utf8'));
let body = '## 📊 RAG 评估报告\n\n';
body += `**基线**: ${report.baseline_timestamp}\n\n`;
body += `### 总体判定: ${report.verdict}\n\n`;
body += '| 指标 | 基线 | 当前 | 变化 | 状态 |\n';
body += '|------|------|------|------|------|\n';
for (const d of report.deltas) {
const icon = d.result === 'improved' ? '🟢' :
d.result === 'degraded' ? '🔴' : '⚪';
body += `| ${d.metric_name} | ${d.old_value} | ${d.new_value} | ${d.delta_pct > 0 ? '+' : ''}${d.delta_pct}% | ${icon} |\n`;
}
if (report.recommendations && report.recommendations.length > 0) {
body += '\n### 💡 建议\n';
for (const rec of report.recommendations) {
body += `- ${rec}\n`;
}
}
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: body
});
- name: Check evaluation gate
id: check_gate
run: |
python -c "
import json
with open('./eval_results/comparison_report.json') as f:
report = json.load(f)
verdict = report.get('verdict', '')
if 'FAIL' in verdict:
print('::error::Evaluation FAILED - significant degradation detected')
exit(1)
elif 'CONDITIONAL' in verdict:
print('::warning::Evaluation CONDITIONAL - manual review required')
exit(0)
else:
print('Evaluation PASSED')
exit(0)
"
- name: Notify on failure
if: failure()
run: |
echo "RAG 评估未通过!请在 PR 中查看详细报告。"
echo "如果这是预期内的改动,请联系团队负责人审批豁免。"对应的 Python 入口脚本 run_evaluation.py:
python
#!/usr/bin/env python3
"""CI/CD 评估流水线入口"""
import argparse
import asyncio
import json
import sys
from pathlib import Path
from datetime import datetime
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core.evaluation import (
DatasetGenerator,
RetrieverEvaluator,
BatchEvalRunner,
FaithfulnessEvaluator,
RelevancyEvaluator,
)
from llama_index.llms.openai import OpenAI
async def main():
parser = argparse.ArgumentParser(description="RAG 评估流水线")
parser.add_argument("--data-dir", required=True, help="数据目录")
parser.add_argument("--baseline", required=True, help="基线文件路径")
parser.add_argument("--output", default="./eval_results", help="输出目录")
parser.add_argument("--threshold", type=float, default=2.0, help="显著性阈值(%)")
parser.add_argument("--questions", type=int, default=50, help="评估问题数量")
parser.add_argument("--strict", action="store_true", help="严格模式")
args = parser.parse_args()
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
print(f"[1/4] 加载数据: {args.data_dir}")
documents = SimpleDirectoryReader(args.data_dir).load_data()
print(f"[2/4] 构建索引...")
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine(similarity_top_k=3)
print(f"[3/4] 运行评估 ({args.questions} 个问题)...")
faithfulness_eval = FaithfulnessEvaluator(llm=OpenAI(model="gpt-4o"))
relevancy_eval = RelevancyEvaluator(llm=OpenAI(model="gpt-4o"))
runner = BatchEvalRunner(
{"faithfulness": faithfulness_eval, "relevancy": relevancy_eval},
workers=8,
)
question_file = Path(args.baseline).parent / "eval_questions.json"
if question_file.exists():
with open(question_file, "r", encoding="utf-8") as f:
eval_questions = json.load(f)
print(f" 使用已有问题集: {len(eval_questions)} 个问题")
else:
print(" 生成新的评估问题集...")
generator = DatasetGenerator.from_documents(
documents, llm=OpenAI(model="gpt-4o", temperature=0.7)
)
eval_questions = generator.generate_from_nodes(num=args.questions)
with open(question_file, "w", encoding="utf-8") as f:
json.dump(eval_questions, f, ensure_ascii=False)
eval_results = await runner.aevaluate_queries(query_engine, queries=eval_questions)
retriever = index.as_retriever(similarity_top_k=5)
ret_evaluator = RetrieverEvaluator.from_metric_names(
["mrr", "hit_rate"], retriever=retriever
)
ret_results = await ret_evaluator.aevaluate_dataset(eval_questions)
new_results = {
"timestamp": datetime.now().isoformat(),
"generation_metrics": {
"faithfulness": eval_results["faithfulness"].get_average_score(),
"relevancy": eval_results["relevancy"].get_average_score(),
},
"retrieval_metrics": {
"mrr": ret_results.get_average_score("mrr"),
"hit_rate": ret_results.get_average_score("hit_rate"),
},
}
print(f"[4/4] 对比基线...")
from evaluation.comparator import EvaluationComparator
comparator = EvaluationComparator(
baseline_path=args.baseline,
threshold_pct=args.threshold,
strict_mode=args.strict,
)
report = comparator.compare(new_results)
report_file = output_dir / "comparison_report.json"
with open(report_file, "w", encoding="utf-8") as f:
json.dump(report, f, ensure_ascii=False, indent=2, default=str)
raw_file = output_dir / "raw_results.json"
with open(raw_file, "w", encoding="utf-8") as f:
json.dump(new_results, f, ensure_ascii=False, indent=2, default=str)
print(f"\n报告已保存至: {report_file}")
if "FAIL" in report["verdict"]:
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
asyncio.run(main())这里有几个工程上的关键点值得展开讨论。首先是 评估问题集的管理策略:上面的代码中,我们第一次运行时会用 LLM 生成问题集并缓存到文件里,后续每次 CI 跑的时候复用同一份问题集。这样做的原因是:如果每次都用 LLM 重新生成问题,那么两次评估之间的差异可能来自"问题不同"而不是"系统变化",这会让对比失去意义。当然,问题集本身也需要定期更新——比如每两周人工审核一次,加入新出现的问题类型。
其次是 退出码的设计:评估脚本通过 sys.exit(1) 在 FAIL 时返回非零退出码,这样 GitHub Actions 的后续 step 就能感知到失败并触发通知。而 CONDITIONAL 的情况返回 0 但打印 warning,这意味着不会阻断 CI 但会在日志中留下痕迹。
第三个容易踩坑的地方是 CI 环境中的 API 调用量控制。上面这个流水线每次 PR 都会调用 OpenAI API 跑 50 个问题的评估,假设每个问题需要 2 次 LLM 调用(faithfulness + relevancy 各一次),那就是 100 次额外的 GPT-4o 调用。如果你的团队很活跃,一天可能有几十个 PR,API 费用会快速累积。解决方案包括:(1)只对 main 分支的推送跑完整评估,PR 上只跑轻量级抽样(比如 10 个问题);(2)使用更便宜的模型(如 gpt-4o-mini)跑评估;(3)对同一个 PR 的重复提交做去重,只在最新 commit 上跑。
阶段四:生产环境监控
系统上线之后,评估并没有结束——相反,真正的挑战才刚刚开始。生产环境的评估核心难点在于:你没有标准答案可以对比。在开发阶段你可以精心编写 reference answer,但在生产中用户的提问千奇百怪,不可能预先准备好答案。所以生产监控需要换一套方法论。
在线评估策略一:用户隐式反馈信号
最简单也最实用的在线评估方法是收集用户的隐式反馈——用户虽然不会明确告诉你"这个回答好不好",但他们的行为会透露线索:
python
from dataclasses import dataclass
from typing import Optional
from collections import defaultdict
from datetime import datetime, timedelta
import statistics
import json
from pathlib import Path
@dataclass
class QueryRecord:
query_id: str
query_text: str
response_text: str
source_nodes_count: int
response_latency_ms: float
timestamp: datetime
user_id: Optional[str] = None
session_id: Optional[str] = None
copied: bool = False
liked: Optional[bool] = None
follow_up_asked: bool = False
rephrased: bool = False
abandoned: bool = False
feedback_delay_s: float = 0.0
class ProductionMonitor:
"""生产环境 RAG 质量监控器"""
def __init__(self, window_minutes: int = 60):
self.window_minutes = window_minutes
self.records: list[QueryRecord] = []
self.alert_thresholds = {
"avg_latency_ms": 5000,
"copy_rate": 0.15,
"abandon_rate": 0.40,
"rephrase_rate": 0.20,
}
def record_query(self, record: QueryRecord):
self.records.append(record)
cutoff = datetime.now() - timedelta(minutes=self.window_minutes)
self.records = [r for r in self.records if r.timestamp >= cutoff]
def get_health_report(self) -> dict:
if not self.records:
return {"status": "no_data", "message": "暂无数据"}
window_start = min(r.timestamp for r in self.records)
total = len(self.records)
copy_count = sum(1 for r in self.records if r.copied)
abandon_count = sum(1 for r in self.records if r.abandoned)
rephrase_count = sum(1 for r in self.records if r.rephrased)
follow_up_count = sum(1 for r in self.records if r.follow_up_asked)
latencies = [r.response_latency_ms for r in self.records]
report = {
"status": "healthy",
"window": f"{window_start.strftime('%H:%M')} ~ {datetime.now().strftime('%H:%M')}",
"total_queries": total,
"metrics": {
"avg_latency_ms": statistics.mean(latencies),
"p50_latency_ms": statistics.median(latencies),
p95_key: sorted(latencies)[int(len(latencies) * 0.95)] if latencies else 0,
p99_key: sorted(latencies)[int(len(latencies) * 0.99)] if latencies else 0,
"copy_rate": copy_count / total,
"abandon_rate": abandon_count / total,
"rephrase_rate": rephrase_count / total,
"follow_up_rate": follow_up_count / total,
},
"alerts": [],
"trends": self._compute_trends(),
}
metrics = report["metrics"]
if metrics["avg_latency_ms"] > self.alert_thresholds["avg_latency_ms"]:
report["alerts"].append({
"level": "warning",
"metric": "avg_latency_ms",
"value": metrics["avg_latency_ms"],
"threshold": self.alert_thresholds["avg_latency_ms"],
"message": f"平均响应延迟 {metrics['avg_latency_ms']:.0f}ms 超过阈值",
})
if metrics["copy_rate"] < self.alert_thresholds["copy_rate"]:
report["alerts"].append({
"level": "warning",
"metric": "copy_rate",
"value": metrics["copy_rate"],
"message": f"复制率 {metrics['copy_rate']*100:.1f}% 偏低,回答质量可能下降",
})
if metrics["abandon_rate"] > self.alert_thresholds["abandon_rate"]:
report["alerts"].append({
"level": "critical",
"metric": "abandon_rate",
"value": metrics["abandon_rate"],
"message": f"放弃率 {metrics['abandon_rate']*100:.1f}% 过高",
})
if metrics["rephrase_rate"] > self.alert_thresholds["rephrase_rate"]:
report["alerts"].append({
"level": "warning",
"metric": "rephrase_rate",
"value": metrics["rephrase_rate"],
"message": f"改写重问率 {metrics['rephrase_rate']*100:.1f}% 偏高",
})
if report["alerts"]:
max_level = max(a["level"] for a in report["alerts"])
report["status"] = "critical" if max_level == "critical" else "degraded"
return report
def _compute_trends(self) -> dict:
half = len(self.records) // 2
if half < 10:
return {"status": "insufficient_data"}
first_half = self.records[:half]
second_half = self.records[half:]
trends = {}
old_lat = statistics.mean(r.response_latency_ms for r in first_half)
new_lat = statistics.mean(r.response_latency_ms for r in second_half)
change_pct = ((new_lat - old_lat) / old_lat * 100) if old_lat > 0 else 0
trends["latency"] = {"change_pct": change_pct, "direction": "↑" if change_pct > 3 else ("↓" if change_pct < -3 else "→")}
old_copy = sum(1 for r in first_half if r.copied) / len(first_half)
new_copy = sum(1 for r in second_half if r.copied) / len(second_half)
copy_change = ((new_copy - old_copy) / old_copy * 100) if old_copy > 0 else 0
trends["copy"] = {"change_pct": copy_change, "direction": "↑" if copy_change > 3 else ("↓" if copy_change < -3 else "→")}
old_abandon = sum(1 for r in first_half if r.abandoned) / len(first_half)
new_abandon = sum(1 for r in second_half if r.abandoned) / len(second_half)
ab_change = ((new_abandon - old_abandon) / old_abandon * 100) if old_abandon > 0 else 0
trends["abandon"] = {"change_pct": ab_change, "direction": "↑" if ab_change > 3 else ("↓" if ab_change < -3 else "→")}
return trends
def export_bad_cases(self, top_n: int = 20) -> list[dict]:
scored = []
for r in self.records:
score = 0.0
if r.abandoned and not r.copied:
score += 3.0
if r.rephrased:
score += 2.0
if r.response_latency_ms > 8000:
score += 1.0
if r.liked is False:
score += 3.0
scored.append((score, r))
scored.sort(key=lambda x: x[0], reverse=True)
bad_cases = []
for score, r in scored[:top_n]:
bad_cases.append({
"risk_score": round(score, 2),
"query": r.query_text,
"response_preview": r.response_text[:200] + "...",
"signals": {
"copied": r.copied,
"liked": r.liked,
"abandoned": r.abandoned,
"rephrased": r.rephrased,
"latency_ms": r.response_latency_ms,
},
"timestamp": r.timestamp.isoformat(),
})
return bad_cases
p95_key = "p95_latency_ms"
p99_key = "p99_latency_ms"
class MonitoredQueryEngine:
"""带监控功能的查询引擎包装器"""
def __init__(self, query_engine, monitor: ProductionMonitor):
self.query_engine = query_engine
self.monitor = monitor
self._id_counter = 0
def query(self, query_str: str, user_id: str = None, session_id: str = None):
import time
start = time.perf_counter()
response = self.query_engine.query(query_str)
latency_ms = (time.perf_counter() - start) * 1000
record = QueryRecord(
query_id=f"q_{self._id_counter}",
query_text=query_str,
response_text=response.response,
source_nodes_count=len(response.source_nodes),
response_latency_ms=latency_ms,
timestamp=datetime.now(),
user_id=user_id,
session_id=session_id,
)
self._id_counter += 1
self.monitor.record_query(record)
response.metadata["_monitor_record_id"] = record.query_id
return response
def record_feedback(self, record_id: str, **kwargs):
for r in self.monitor.records:
if r.query_id == record_id:
for k, v in kwargs.items():
if hasattr(r, k):
setattr(r, k, v)
break这套隐式反馈系统的设计哲学是:不依赖用户的主动评价行为(因为大多数用户懒得点好评/差评),而是从用户的行为模式中推断满意度。其中几个信号的解读逻辑值得仔细说明:
- 复制率(Copy Rate):如果用户复制了回答的内容,这是一个强正信号——说明回答中有用户认为有价值的信息。一般来说,知识库问答场景的复制率应该在 20%-40% 之间,低于 15% 就需要警惕。
- 放弃率(Abandon Rate):用户收到回答后没有任何后续操作(不复制、不追问、不改写),直接离开了。这通常意味着回答完全没用。
- 改写重问率(Rephrase Rate):用户收到回答后换了一种方式再问同样的问题。这说明首次回答可能方向对了但不够准确,或者是用户觉得系统没理解他的意思。
- 响应延迟的 P99:P99 延迟比平均延迟更重要,因为那代表了最慢的那 1% 用户体验。如果 P99 超过 10 秒,即使平均只有 2 秒,也会有相当比例的用户感到不耐烦。
在线评估策略二:LLM-as-Judge 自动打分
对于重要的客户(比如 VIP 企业用户),隐式反馈可能不够精细。这时可以用 LLM-as-Judge 方法,在生产环境中对部分查询做实时质量打分:
python
import asyncio
from typing import Optional
from dataclasses import dataclass
from llama_index.llms.openai import OpenAI
from llama_index.core.prompts import PromptTemplate
@dataclass
class QualityScore:
query_id: str
overall_score: float
faithfulness: float
relevance: float
completeness: float
reasoning: str
improvement_suggestion: str
JUDGE_PROMPT = PromptTemplate(
"""你是一个专业的 RAG 系统质量评审员。请对以下问答进行评分。
## 用户问题
{query}
## 系统回答
{response}
## 检索到的参考上下文
{context}
## 评分标准(每个维度1-5分)
- 忠实度(Faithfulness):回答是否严格基于提供的上下文,有无幻觉
- 相关性(Relevance):回答是否针对用户问题,有无偏题
- 完整性(Completeness):回答是否充分回应了问题的各个方面
- 综合质量(Overall):综合考虑以上维度的整体质量
请以JSON格式输出评分结果:
```json
{
"overall_score": <1-5>,
"faithfulness": <1-5>,
"relevance": <1-5>,
"completeness": <1-5>,
"reasoning": "<简要说明评分理由>",
"improvement_suggestion": "<如果质量不佳,给出改进建议>"
}""" )
class OnlineQualityJudge: """在线质量评判器(异步,不阻塞主流程)"""
def __init__(
self,
llm: OpenAI = None,
sample_rate: float = 0.1,
batch_size: int = 5,
):
self.llm = llm or OpenAI(model="gpt-4o-mini")
self.sample_rate = sample_rate
self.batch_size = batch_size
self.pending: list[dict] = []
self.scores: list[QualityScore] = []
def should_judge(self) -> bool:
import random
return random.random() < self.sample_rate
async def submit_for_judging(
self,
query_id: str,
query_text: str,
response_text: str,
context_texts: list[str],
):
context = "\n---\n".join(context_texts[:3])
self.pending.append({
"query_id": query_id,
"query_text": query_text,
"response_text": response_text,
"context": context,
})
if len(self.pending) >= self.batch_size:
await self._process_batch()
async def _process_batch(self):
if not self.pending:
return
batch = self.pending[:self.batch_size]
self.pending = self.pending[self.batch_size:]
tasks = [self._judge_one(item) for item in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, QualityScore):
self.scores.append(result)
else:
print(f"评判失败: {result}")
async def _judge_one(self, item: dict) -> QualityScore:
prompt = JUDGE_PROMPT.format(
query=item["query_text"],
response=item["response_text"],
context=item["context"],
)
response = await self.llm.acomplete(prompt)
try:
import re
json_match = re.search(r'```json\s*(.*?)\s*```', response.text, re.DOTALL)
if json_match:
data = json.loads(json_match.group(1))
else:
data = json.loads(response.text.strip())
return QualityScore(
query_id=item["query_id"],
overall_score=data.get("overall_score", 0),
faithfulness=data.get("faithfulness", 0),
relevance=data.get("relevance", 0),
completeness=data.get("completeness", 0),
reasoning=data.get("reasoning", ""),
improvement_suggestion=data.get("improvement_suggestion", ""),
)
except (json.JSONDecodeError, KeyError) as e:
print(f"解析评判结果失败: {e}, 原始文本: {response.text[:200]}")
return QualityScore(
query_id=item["query_id"],
overall_score=0, faithfulness=0, relevance=0,
completeness=0, reasoning=f"Parse error: {e}",
improvement_suggestion="",
)
def get_quality_stats(self, hours: int = 1) -> dict:
from datetime import timedelta
cutoff = datetime.now() - timedelta(hours=hours)
recent = [s for s in self.scores]
if not recent:
return {"status": "no_recent_scores"}
return {
"total_judged": len(recent),
"avg_overall": statistics.mean(s.overall_score for s in recent),
"avg_faithfulness": statistics.mean(s.faithfulness for s in recent),
"avg_relevance": statistics.mean(s.relevance for s in recent),
"score_distribution": self._count_distribution(s.overall_score for s in recent),
"common_issues": self._extract_common_issues(recent),
}
def _count_distribution(self, scores) -> dict:
dist = {"5(优秀)": 0, "4(良好)": 0, "3(一般)": 0, "2(较差)": 0, "1(很差)": 0}
for s in scores:
bucket = int(s)
key = list(dist.keys())[min(bucket - 1, 4)]
dist[key] += 1
return dist
def _extract_common_issues(self, scores: list) -> list[str]:
issues = []
low_faith = sum(1 for s in scores if s.faithfulness <= 2)
low_rel = sum(1 for s in scores if s.relevance <= 2)
total = len(scores)
if low_faith / total > 0.2:
issues.append(f"忠实度偏低 ({low_faith}/{total}),可能存在幻觉问题")
if low_rel / total > 0.2:
issues.append(f"相关性偏低 ({low_rel}/{total}),检索或理解可能偏离主题")
return issues
这个在线评判器的设计要点是:**抽样而非全量**(用 `sample_rate=0.1` 只评判 10% 的查询控制成本)、**批量处理**(攒够一批再发送减少 API 调用次数)、**异步非阻塞**(评判操作不影响主查询流程的延迟)、**用便宜模型**(评判用的是 gpt-4o-mini 而不是 gpt-4o,因为评判任务不需要最强的推理能力)。这里有一个实际部署时需要注意的问题:评判结果的解析要足够鲁棒——LLM 输出的 JSON 格式不一定完全规范,所以代码中用了正则来提取 ```json 代码块中的内容作为第一选择,再尝试直接 parse 整体文本作为 fallback。
### 阶段五:持续改进闭环
评估的最终目的不是为了得到一个漂亮的分数,而是为了驱动系统持续改进。下面是一个完整的持续改进工作流的实现:
```python
@dataclass
class ImprovementAction:
id: str
category: str
title: str
description: str
priority: int
estimated_impact: str
status: str
evidence: list[str]
created_at: datetime
completed_at: Optional[datetime] = None
result_metrics: Optional[dict] = None
class ContinuousImprovementLoop:
"""RAG 系统持续改进闭环管理器"""
def __init__(self, monitor: ProductionMonitor, judge: OnlineQualityJudge):
self.monitor = monitor
self.judge = judge
self.actions: list[ImprovementAction] = []
self.history: list[dict] = []
def weekly_review(self) -> dict:
health = self.monitor.get_health_report()
bad_cases = self.monitor.export_bad_cases(top_n=30)
quality_stats = self.judge.get_quality_stats(hours=168)
review_input = {
"review_date": datetime.now().isoformat(),
"health_status": health["status"],
"key_metrics": health["metrics"],
"alerts": health["alerts"],
"top_bad_cases": bad_cases[:10],
"quality_stats": quality_stats,
"proposed_actions": self._generate_proposals(health, bad_cases, quality_stats),
"previous_actions_status": self._summarize_previous_actions(),
}
return review_input
def _generate_proposals(self, health, bad_cases, quality_stats) -> list:
proposals = []
action_id = len(self.actions) + 1
if health["metrics"]["abandon_rate"] > 0.35:
proposals.append(ImprovementAction(
id=f"A{action_id:03d}",
category="retrieval",
title="引入 Hybrid Search 提升召回率",
description=(
"当前纯向量检索在高放弃率场景下表现不足,"
"建议引入 BM25 关键词检索与向量检索融合,"
"预计可将 hit_rate 提升 10-15%。"
),
priority=4,
estimated_impact="high",
status="proposed",
evidence=[
f"放弃率 {health['metrics']['abandon_rate']*100:.1f}% 超过阈值",
],
created_at=datetime.now(),
))
action_id += 1
if health["metrics"].get(p99_key, 0) > 10000:
proposals.append(ImprovementAction(
id=f"A{action_id:03d}",
category="infrastructure",
title="优化查询链路延迟",
description=(
f"P99 延迟达 {health['metrics'].get(p99_key, 0)/1000:.1f}s,"
"主要瓶颈可能在 embedding 推理、LLM 生成或向量数据库查询。"
"建议逐段 profiling 后针对性优化。"
),
priority=3,
estimated_impact="medium",
status="proposed",
evidence=[f"P99={health['metrics'].get(p99_key, 0):.0f}ms"],
created_at=datetime.now(),
))
action_id += 1
if quality_stats.get("avg_faithfulness", 5) < 3.5:
proposals.append(ImprovementAction(
id=f"A{action_id:03d}",
category="synthesis",
title="增强幻觉检测与约束",
description=(
"LLM 评判显示忠实度偏低,可能存在模型编造上下文中不存在的信息。"
"建议:(1) 在 system prompt 中强化基于上下文回答约束;"
"(2) 启用 FaithfulnessEvaluator 做后处理过滤;"
"(3) 考虑切换到 REFINE 合成模式。"
),
priority=5,
estimated_impact="high",
status="proposed",
evidence=[f"平均忠实度: {quality_stats.get('avg_faithfulness', 0):.2f}/5"],
created_at=datetime.now(),
))
action_id += 1
if health["metrics"]["rephrase_rate"] > 0.15:
proposals.append(ImprovementAction(
id=f"A{action_id:03d}",
category="retrieval",
title="引入 HyDE 或 Query Rewriting",
description=(
f"改写重问率达 {health['metrics']['rephrase_rate']*100:.1f}%,"
"说明首次检索的语义匹配不够精准。建议启用 HyDEQueryTransform "
"或 DecomposeQueryTransform 来改善查询理解。"
),
priority=3,
estimated_impact="medium",
status="proposed",
evidence=[f"重问率: {health['metrics']['rephrase_rate']*100:.1f}%"],
))
self.actions.extend(proposals)
return proposals
def _summarize_previous_actions(self) -> list[dict]:
summary = []
completed = [a for a in self.actions if a.status == "completed"]
in_progress = [a for a in self.actions if a.status == "in_progress"]
if completed:
summary.append({"status": "completed", "count": len(completed)})
if in_progress:
summary.append({
"status": "in_progress",
"count": len(in_progress),
"items": [{"id": a.id, "title": a.title} for a in in_progress],
})
return summary
def complete_action(self, action_id: str, result_metrics: dict = None):
for a in self.actions:
if a.id == action_id:
a.status = "completed"
a.completed_at = datetime.now()
a.result_metrics = result_metrics
self.history.append({
"action_id": action_id,
"title": a.title,
"completed_at": a.completed_at.isoformat(),
"result": result_metrics,
})
break
def generate_weekly_report(self) -> str:
review = self.weekly_review()
lines = [
"# RAG 系统周报",
f"> 生成时间: {review['review_date']}",
"",
"## 📊 系统健康状态",
f"- 整体状态: **{review['health_status'].upper()}**",
f"- 本周总查询量: **{review['key_metrics'].get('total_queries', 'N/A')}**",
f"- 平均延迟: **{review['key_metrics'].get('avg_latency_ms', 0):.0f}ms**",
f"- 复制率: **{review['key_metrics'].get('copy_rate', 0)*100:.1f}%**",
f"- 放弃率: **{review['key_metrics'].get('abandon_rate', 0)*100:.1f}%**",
"",
]
if review["alerts"]:
lines.append("## 🚨 本周告警")
for alert in review["alerts"]:
icon = "🔴" if alert["level"] == "critical" else "⚠️"
lines.append(f"- {icon} [{alert['metric']}] {alert['message']}")
lines.append("")
if review["proposed_actions"]:
lines.append("## 📋 建议改进措施")
for action in review["proposed_actions"]:
stars = "⭐" * action.priority
lines.append(f"### {action.id} {action.title} {stars}")
lines.append(f"- 类别: {action.category}")
lines.append(f"- 预期影响: {action.estimated_impact}")
lines.append(f"- 依据: {'; '.join(action.evidence[:2])}")
lines.append("")
if review.get("previous_actions_status"):
lines.append("## ✅ 历史措施进展")
for s in review["previous_actions_status"]:
tag = s.get("count", "")
lines.append(f"- **{s['status']}**: {tag} 项")
lines.append("")
return "\n".join(lines)ContinuousImprovementLoop 是整个评估体系的"大脑"——它把监控数据、评判数据和历史记录整合在一起,自动生成改进提案并追踪执行状态。其中的 _generate_proposals 方法展示了一种基于规则的简单决策逻辑:当某个监控指标超过预设阈值时,自动创建对应的改进工单。当然,在生产环境中你可能希望用 LLM 来做更智能的提案生成(让 GPT-4o 分析所有数据然后给出建议),但规则方式的好处是可解释性强、不会产生奇怪的提案、且运行成本几乎为零。
评估仪表盘:一站式可视化
有了各种评估数据和监控指标之后,最后一步是把它们整合到一个可视化的仪表盘中。下面是一个基于 FastAPI + Chart.js 的评估仪表盘实现:
python
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
import json
app = FastAPI(title="RAG Evaluation Dashboard")
monitor_instance = None
judge_instance = None
loop_instance = None
def get_dashboard_html(data: dict) -> str:
quality_dist_json = json.dumps(list(data.get("quality_dist", {}).keys()))
quality_vals_json = json.dumps(list(data.get("quality_dist", {}).values()))
total_q = data["health"]["metrics"].get("total_queries", 0)
abandon_r = data["health"]["metrics"].get("abandon_rate", 0)
copy_r = data["health"]["metrics"].get("copy_rate", 0)
return f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>RAG 评估仪表盘</title>
<script src="https://cdn.tailwindcss.com"></script>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
</head>
<body class="bg-gray-900 text-gray-100 min-h-screen">
<div class="container mx-auto px-4 py-8">
<div class="flex justify-between items-center mb-8">
<h1 class="text-3xl font-bold">🔍 RAG 系统评估仪表盘</h1>
<div id="status-badge" class="px-4 py-2 rounded-full text-sm font-semibold
{'bg-green-500' if data['health']['status'] == 'healthy' else
'bg-yellow-500' if data['health']['status'] == 'degraded' else 'bg-red-500'}">
{data['health']['status'].upper()}
</div>
</div>
<div class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-4 gap-6 mb-8">
<div class="bg-gray-800 rounded-xl p-6 border border-gray-700">
<div class="text-gray-400 text-sm mb-1">总查询量</div>
<div class="text-3xl font-bold">{total_q}</div>
<div class="text-xs text-gray-500 mt-1">窗口: {data['health'].get('window', '-')}</div>
</div>
<div class="bg-gray-800 rounded-xl p-6 border border-gray-700">
<div class="text-gray-400 text-sm mb-1">平均延迟</div>
<div class="text-3xl font-bold">{data['health']['metrics'].get('avg_latency_ms', 0):.0f}<span class="text-lg">ms</span></div>
<div class="text-xs text-gray-500 mt-1">P99: {data['health']['metrics'].get(p99_key, 0):.0f}ms</div>
</div>
<div class="bg-gray-800 rounded-xl p-6 border border-gray-700">
<div class="text-gray-400 text-sm mb-1">复制率</div>
<div class="text-3xl font-bold">{copy_r*100:.1f}<span class="text-lg">%</span></div>
<div class="text-xs mt-1">{'📈' if copy_r > 0.2 else '📉'}</div>
</div>
<div class="bg-gray-800 rounded-xl p-6 border border-gray-700">
<div class="text-gray-400 text-sm mb-1">放弃率</div>
<div class="text-3xl font-bold">{abandon_r*100:.1f}<span class="text-lg">%</span></div>
<div class="text-xs mt-1">阈值: 40%</div>
</div>
</div>
{'<div class="bg-red-900/30 border border-red-700 rounded-xl p-6 mb-8">' if data['health']['alerts'] else '<div class="hidden">'}
<h2 class="text-lg font-semibold text-red-400 mb-3">🚨 活跃告警</h2>
{"".join([f'''
<div class="flex items-start gap-3 py-2 border-b border-red-800 last:border-0">
<span class="{'text-red-400' if a['level']=='critical' else 'text-yellow-400'}">●</span>
<div>
<div class="font-medium">{a['metric']}</div>
<div class="text-sm text-gray-400">{a['message']}</div>
</div>
</div>
''' for a in data['health']['alerts']])}
</div>
<div class="grid grid-cols-1 lg:grid-cols-2 gap-6 mb-8">
<div class="bg-gray-800 rounded-xl p-6 border border-gray-700">
<h2 class="text-lg font-semibold mb-4">质量评分分布</h2>
<canvas id="qualityChart" height="200"></canvas>
</div>
<div class="bg-gray-800 rounded-xl p-6 border border-gray-700">
<h2 class="text-lg font-semibold mb-4">用户行为漏斗</h2>
<canvas id="funnelChart" height="200"></canvas>
</div>
</div>
<div class="bg-gray-800 rounded-xl p-6 border border-gray-700">
<h2 class="text-lg font-semibold mb-4">⚠️ 高风险案例 TOP 10</h2>
<div class="overflow-x-auto">
<table class="w-full text-sm">
<thead><tr class="text-left text-gray-400 border-b border-gray-700">
<th class="pb-2">风险分</th><th class="pb-2">问题</th>
<th class="pb-2">信号</th><th class="pb-2">时间</th>
</tr></thead>
<tbody>
{"".join([f'''
<tr class="border-b border-gray-700/50">
<td class="py-2 font-mono {'text-red-400' if c['risk_score'] >= 3 else 'text-yellow-400'}">{c['risk_score']}</td>
<td class="py-2 max-w-md truncate">{c['query'][:60]}</td>
<td class="py-2 text-xs">
{'✓复制' if c['signals']['copied'] else '✗'} {'✗放弃' if c['signals']['abandoned'] else ' '}
{'↻重问' if c['signals']['rephrased'] else ' '}
</td>
<td class="py-2 text-gray-500">{c['timestamp'][11:16]}</td>
</tr>
''' for c in data.get('bad_cases', [])[:10]])}
</tbody>
</table>
</div>
</div>
</div>
<script>
new Chart(document.getElementById('qualityChart'), {{
type: 'doughnut',
data: {{
labels: {quality_dist_json},
datasets: [{{
data: {quality_vals_json},
backgroundColor: ['#22c55e', '#84cc16', '#eab308', '#f97316', '#ef4444']
}}]
}},
options: {{ responsive: true, plugins: {{ legend: {{ position: 'bottom' }} }} }}
}});
new Chart(document.getElementById('funnelChart'), {{
type: 'bar',
data: {{
labels: ['总查询', '有交互', '复制', '满意'],
datasets: [{{
data: [
{total_q},
{int(total_q * (1 - abandon_r))},
{int(total_q * copy_r)},
{int(total_q * copy_r * 0.7)}
],
backgroundColor: ['#6366f1', '#8b5cf6', '#a855f7', '#d946ef']
}}]
}},
options: {{ responsive: true, indexAxis: 'y', plugins: {{ legend: {{ display: false }} }} }}
}});
</script>
</body>
</html>"""
@app.get("/", response_class=HTMLResponse)
async def dashboard():
if not monitor_instance:
raise HTTPException(status_code=503, detail="Monitor not initialized")
health = monitor_instance.get_health_report()
bad_cases = monitor_instance.export_bad_cases(top_n=20)
quality_stats = judge_instance.get_quality_stats(hours=1) if judge_instance else {}
data = {
"health": health,
"bad_cases": bad_cases,
"quality_dist": quality_stats.get("score_distribution", {}),
}
return get_dashboard_html(data)
@app.get("/api/health")
async def api_health():
if not monitor_instance:
raise HTTPException(status_code=503, detail="Monitor not initialized")
return monitor_instance.get_health_report()
@app.get("/api/bad-cases")
async def api_bad_cases(limit: int = 20):
if not monitor_instance:
raise HTTPException(status_code=503, detail="Monitor not initialized")
return monitor_instance.export_bad_cases(top_n=limit)
@app.get("/api/weekly-report")
async def api_weekly_report():
if not loop_instance:
raise HTTPException(status_code=503, detail="Improvement loop not initialized")
return {"report": loop_instance.generate_weekly_report()}启动仪表盘服务后,你就可以在浏览器中看到一个实时的 RAG 系统健康面板,包含核心指标卡片、活跃告警、质量分布饼图、用户行为漏斗图和高风险案例列表。这个仪表盘可以作为团队每日站会的讨论基础,也可以投屏到大屏幕上作为团队的"质量温度计"。FastAPI 提供的同时还有 /api/health、/api/bad-cases、/api/weekly-report 等 RESTful API 端点,方便与其他运维系统集成(比如 Prometheus 抓取、Grafana 展示、钉钉机器人推送等)。
常见误区与避坑指南
在搭建和使用 RAG 评估体系的过程中,有几个特别常见的错误值得单独提出来:
误区一:只看平均值不看分布
这是新手最容易犯的错误。假设你有 50 个测试问题,Faithfulness 平均分 0.90,看起来很棒对吧?但如果其中 5 个问题的得分是 0.0(完全胡说八道),另外 45 个是 1.0(完美),平均下来还是 0.90。这 5 个零分问题可能恰好是你最重要的业务场景(比如产品定价咨询),但你被平均值蒙蔽了双眼。
正确做法:始终关注指标的分布情况,特别是 P10 和 P90 分位数。在 BaselineBuilder 中我们已经保存了逐题详情,记得用它来做分布分析而不是只看均值。一个实用的技巧是:把所有问题的得分按从低到高排序画出来,如果曲线的左尾部有明显拖尾,那就说明存在系统性短板需要优先解决。
误区二:测试集泄露到训练数据
如果你用 LLM 生成评估问题集,然后又把这些问题和答案加入到知识库的数据源中,那就出现了严重的泄露——系统当然能答好这些问题,因为这些答案就在它的"课本"里。更隐蔽的泄露方式是:你在调试过程中反复用同一批问题测试,然后根据测试结果微调了 chunking 参数或 prompt 模板,这本质上也是一种过拟合。
正确做法:维护三套独立的数据集——开发集(日常调试用)、验证集(调参选模型用)、测试集(最终汇报用)。测试集在整个开发过程中只能使用一次,就像期末考试一样。如果你发现自己在反复查看测试集的结果来指导开发,那就应该立即停下来,换回开发集继续工作。
误区三:评估指标与业务目标脱节
Faithfulness 得分高不一定意味着用户满意。一个极端的例子:系统对每个问题都回答"抱歉,我没有找到相关信息",Faithfulness 可能是 1.0(因为没有编造任何信息),但用户满意度显然是 0。另一个常见情况是:你的 MRR 从 0.75 提升到了 0.80,团队庆祝了一番,但客户那边客服转接率没有任何变化——因为 MRR 的提升主要来自于那些本来就能答好的问题变得更精准了,而真正导致用户转人工的"困难问题"并没有改善。
正确做法:建立"评估指标 → 业务指标"的映射关系表,定期做用户调研或 A/B 测试来验证映射的有效性。必要时引入业务侧指标(如客服工单转接率、用户次日留存、平均会话时长等)作为辅助校验。记住:评估是为了服务于业务目标,而不是为了追求漂亮的数字。
误区四:忽视评估本身的成本
完整的评估流程(特别是涉及 LLM-as-Judge 的)是不便宜的。前面提到过,50 个问题 × 2 个评价指标 = 100 次 GPT-4o 调用,按当前价格大约几美元。如果你想做更全面的评估(加上检索指标、多个 LLM judge 交叉验证、不同 prompt 变体的 A/B 测试),单次评估的成本轻松达到几十美元。如果还要在 CI 里每次 PR 都跑,一个月下来就是一笔不小的开支。
正确做法:建立分层评估策略——PR 阶段用小规模抽样(10 个问题)+ 便宜模型(gpt-4o-mini)做快速门禁;合入 main 后跑完整评估(50 个问题)+ 强模型(gpt-4o)做正式验收;每周或每两周做一次深度评估(100 个问题)+ 多 judge 投票做趋势分析。同时,尽可能缓存中间计算结果(如 embedding、检索结果),避免重复计算。