跳转到内容

RAG 知识库管理

增量更新知识库

python
import pandas as pd
import numpy as np
from datetime import datetime

class KnowledgeBaseManager:
    """RAG 知识库管理器"""

    def __init__(self, kb_df=None):
        self.kb = kb_df if kb_df is not None else pd.DataFrame()
        self.version = 1

    def add_chunks(self, chunks_df):
        """批量添加新 chunks"""
        now = datetime.now()

        chunks_df = chunks_df.copy()
        chunks_df['version'] = self.version
        chunks_df['created_at'] = now
        chunks_df['updated_at'] = now
        chunks_df['is_active'] = True
        chunks_df['embedding_status'] = 'pending'

        if 'chunk_id' not in chunks_df.columns:
            base_id = len(self.kb) + 1
            chunks_df.insert(0, 'chunk_id',
                             [f'ch_{base_id+i:06d}' for i in range(len(chunks_df))])

        self.kb = pd.concat([self.kb, chunks_df], ignore_index=True)
        return len(chunks_df)

    def update_chunk_content(self, chunk_id, new_content):
        """更新指定 chunk 的内容"""
        mask = self.kb['chunk_id'] == chunk_id
        if not mask.any():
            raise ValueError(f"Chunk '{chunk_id}' 不存在")

        self.kb.loc[mask, 'content'] = new_content
        self.kb.loc[mask, 'updated_at'] = datetime.now()
        self.kb.loc[mask, 'version'] += 1
        self.kb.loc[mask, 'embedding_status'] = 'pending'
        return True

    def deactivate_by_source(self, source_doc):
        """停用某文档的所有 chunk(软删除)"""
        mask = self.kb['source_doc'] == source_doc
        count = mask.sum()
        self.kb.loc[mask, 'is_active'] = False
        return int(count)

    def get_active_kb(self):
        """获取当前活跃的知识库"""
        return self.kb[self.kb['is_active'] == True].copy()


np.random.seed(42)
kb = KnowledgeBaseManager()

initial = pd.DataFrame({
    'content': [f'文档A 第{i}段内容' for i in range(5)],
    'source_doc': ['doc_a.pdf']*5,
    'source_type': ['paper']*5,
    'token_count': np.random.randint(100, 500, 5),
})
n_added = kb.add_chunks(initial)
print(f"初始导入: {n_added} 个 chunks")

additional = pd.DataFrame({
    'content': [f'文档B 第{i}段内容' for i in range(3)],
    'source_doc': ['doc_b.html']*3,
    'source_type': ['api_docs']*3,
    'token_count': np.random.randint(150, 600, 3),
})
n_added2 = kb.add_chunks(additional)
print(f"追加导入: {n_added2} 个 chunks")

kb.update_chunk_content('ch_000001', '更新后的文档A第1段内容...')
print("✓ 已更新 ch_000001")

deactivated = kb.deactivate_by_source('doc_a.pdf')
print(f"已停用 doc_a.pdf 的 {deactivated} 个 chunks")

active = kb.get_active_kb()
print(f"\n活跃 chunks: {len(active)}")
print(active[['chunk_id', 'source_doc', 'is_active']].head(8))

知识库去重

python
import pandas as pd
from difflib import SequenceMatcher

class KBDeduplicator:
    """知识库去重器"""

    @staticmethod
    def exact_dedup(kb_df):
        """精确去重(完全相同的内容)"""
        before = len(kb_df)
        deduped = kb_df.drop_duplicates(subset=['content'], keep='first')
        removed = before - len(deduped)
        print(f"精确去重: {before}{len(deduped)} (移除 {removed})")
        return deduped

    @staticmethod
    def fuzzy_dedup(kb_df, threshold=0.9):
        """模糊去重(相似度高于阈值的)"""
        before = len(kb_df)
        contents = kb_df['content'].tolist()
        keep_mask = [True] * len(contents)
        seen = []

        for i, content in enumerate(contents):
            if not keep_mask[i]:
                continue
            is_dup = False
            for seen_content in seen:
                ratio = SequenceMatcher(None, content[:200],
                                          seen_content[:200]).ratio()
                if ratio > threshold:
                    keep_mask[i] = False
                    is_dup = True
                    break
            if not is_dup:
                seen.append(content)

        result = kb_df[keep_mask].reset_index(drop=True)
        removed = before - len(result)
        print(f"模糊去重(>{threshold}): {before}{len(result)} (移除 {removed})")
        return result


test_kb = pd.DataFrame({'content': [
    'Pandas 是 Python 数据分析库',
    'Pandas 是 Python 数据分析库',
    'DataFrame 是二维表格结构',
    'Dataframe 是二维表格结构',
    'groupby 用于分组聚合操作',
    'merge() 实现表连接',
] * 10})

exact_clean = KBDeduplicator.exact_dedup(test_kb)
fuzzy_clean = KBDeduplicator.fuzzy_dedup(test_kb, threshold=0.85)

知识库质量报告

python
class KBQualityReporter:
    """知识库质量报告生成器"""

    @staticmethod
    def generate(kb_df):
        df = kb_df.copy()
        total = len(df)
        active = df[df['is_active'] == True]

        report = {
            '总 chunk 数': total,
            '活跃 chunk 数': len(active),
            '覆盖率': f"{len(active)/total*100:.1f}%" if total > 0 else "N/A",
            'Embedding 完成': int((active['embedding_status'] == 'done').sum()),
            'Embedding 待处理': int((active['embedding_status'] == 'pending').sum()),
            'Embedding 失败': int((active['embedding_status'] == 'failed').sum()),
        }

        if 'token_count' in active.columns and len(active) > 0:
            report.update({
                '平均 Token 数': round(active['token_count'].mean(), 0),
                'Token 总计': int(active['token_count'].sum()),
                '最大 Token 数': int(active['token_count'].max()),
                '最小 Token 数': int(active['token_count'].min()),
            })

        if 'source_type' in active.columns:
            report['来源分布'] = dict(
                active['source_type'].value_counts().to_dict()
            )

        if 'quality_score' in active.columns:
            qs = active['quality_score']
            report.update({
                '平均质量分': round(qs.mean(), 3),
                '低质量(<0.5)': int((qs < 0.5).sum()),
                '高质量(>0.8)': int((qs > 0.8).sum()),
            })

        print("=" * 50)
        print("RAG 知识库质量报告")
        print("=" * 50)
        for k, v in report.items():
            if isinstance(v, dict):
                print(f"\n{k}:")
                for sk, sv in v.items():
                    print(f"  {sk}: {sv}")
            else:
                print(f"  {k:<20s}: {v}")

        return report


np.random.seed(42)
sample_kb = pd.DataFrame({
    'chunk_id': [f'ch_{i:04d}' for i in range(50)],
    'content': [f'Content {i}...' for i in range(50)],
    'source_type': pd.Categorical(
        np.random.choice(['wiki', 'api_docs', 'blog', 'paper'], 50)
    ),
    'token_count': np.random.randint(50, 2000, 50),
    'quality_score': np.random.beta(2, 4, 50).round(3),
    'is_active': np.random.choice([True]*45 + [False]*5, 50),
    'embedding_status': np.Categorical(
        np.random.choice(['done', 'done', 'done', 'pending', 'failed'], 50)
    ),
})

KBQualityReporter.generate(sample_kb)

基于 MIT 许可发布