主题
多源数据融合:同时从数据库 + 文件 + API 加载数据
前面的章节我们分别学习了如何从文件、数据库、API、云服务等单一数据源加载数据。但在真实的企业环境中,知识几乎总是分散在多个地方的——产品手册是 PDF 存在 S3 上、FAQ 是 Markdown 文件在 Git 仓库里、最新政策写在 Notion Database 中、客户工单在 Jira 里、技术方案讨论在 GitHub Issues 上。
如果你只能搜索其中某一个来源,那得到的信息注定是不完整的。用户问"退款流程是什么",答案可能一半在公司政策 PDF 里,另一半在 Notion 的操作指南中。多源数据融合(Multi-Source Data Fusion)的目标就是把散落在各处的知识汇聚到一个统一的 RAG 索引中,让用户的一次查询就能触及所有相关知识。
这一节我们会学习如何在实际项目中实现多源数据融合,包括架构设计、数据去重、冲突解决、元数据标准化等关键问题。
为什么需要多源融合?
先看一个具体的场景来感受单源搜索的局限性:
一家电商公司的客服团队需要 RAG 系统来辅助回答客户问题。他们的知识分布在:
- Confluence(内部 Wiki):产品功能介绍、技术架构说明
- Notion:最新的运营政策和促销规则(更新频率高)
- PostgreSQL 数据库:商品信息(名称、价格、规格、库存)
- Jira:已知问题和解决方案的历史记录
- SharePoint:合同模板和法务文档
- Slack:临时的决策记录和非正式经验分享
如果客服问"某款商品的保修期是多长",答案可能在 Confluence 的产品文档里,也可能在 Notion 的最新政策更新中,还可能在数据库的商品信息表里。只有同时搜索所有来源,才能给出完整且准确的回答。
基础的多源加载模式
最直接的方式就是分别从各个数据源加载 Document,然后合并到一个列表中:
python
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.readers.database import DatabaseReader
from llama_index.readers.notion import NotionPageReader
from llama_index.readers.jira import JiraReader
from llama_index.readers.slack import SlackReader
all_documents = []
# ===== 来源 1:本地文件(产品文档)=====
file_reader = SimpleDirectoryReader(
input_dir="./docs/product_manuals",
required_exts=[".pdf", ".md"],
file_metadata=lambda fname: {"source_type": "local_file"},
)
all_documents.extend(file_reader.load_data())
# ===== 来源 2:PostgreSQL(商品信息)=====
db_reader = DatabaseReader(
sql_database="postgresql://user:pass@localhost:5432/ecommerce",
sql_query="""
SELECT id, name, category, description, warranty_months,
price, specifications
FROM products
WHERE is_active = true
""",
)
db_docs = db_reader.load_data()
for doc in db_docs:
doc.metadata["source_type"] = "database_postgres"
all_documents.extend(db_docs)
# ===== 来源 3:Notion(运营政策)=====
notion_reader = NotionPageReader(
integration_token=os.getenv("NOTION_TOKEN"),
)
notion_docs = notion_reader.load_data(database_id="policy-db-uuid")
for doc in notion_docs:
doc.metadata["source_type"] = "notion_policy"
all_documents.extend(notion_docs)
# ===== 来源 4:Jira(问题记录)=====
jira_reader = JiraReader(
server_url="https://support.atlassian.net",
email="bot@company.com",
api_token=os.getenv("JIRA_TOKEN"),
)
jira_docs = jira_reader.load_data(
jql_query='project = SUPPORT AND resolution = Fixed',
max_results=200,
)
for doc in jira_docs:
doc.metadata["source_type"] = "jira_issues"
all_documents.extend(jira_docs)
# ===== 来源 5:Slack(团队讨论)=====
slack_reader = SlackReader(
slack_token=os.getenv("SLACK_TOKEN"),
channel_ids=["C01SUPPORT"],
earliest_timestamp=str(int(time.time()) - 86400 * 90), # 最近90天
)
slack_docs = slack_reader.load_data()
for doc in slack_docs:
doc.metadata["source_type"] = "slack_messages"
all_documents.extend(slack_docs)
# ===== 统一建索引 =====
print(f"总计从 {5} 个数据源加载了 {len(all_documents)} 个文档")
index = VectorStoreIndex.from_documents(all_documents)
query_engine = index.as_query_engine(similarity_top_k=5)
response = query_engine.query("智能音箱 S1 的保修期是多久?")
print(response.response)
print("\n--- 来源分析 ---")
for node in response.source_nodes:
print(f" [{node.metadata.get('source_type')}] "
f"分数: {node.score:.3f}")这段代码展示了多源融合的基本模式。注意几个关键点:
第一,source_type 元数据标记。 我们在每个来源的 Document上都打上了 source_type 标签。这看似简单,却极其重要——它让后续的检索结果分析成为可能。当你发现某些查询总是返回低质量结果时,可以通过 source_type 定位是哪个数据源出了问题。
第二,顺序无关性。 VectorStoreIndex.from_documents() 不关心 Document 来自哪里、以什么顺序传入。它会统一地对所有 Document 做分块、嵌入和索引。这意味着你可以随时添加新的数据源,而不需要修改现有的索引逻辑。
第三,不同来源的 Document 可能质量差异很大。 Slack 消息可能是碎片化的对话片段("那个保修的事我看了一下好像是两年"),而 PDF 文档则是正式的、结构化的内容。它们被平等地送入同一个向量空间,这在某些情况下可能导致质量问题——后面会讲如何应对。
数据去重:同一知识出现多次怎么办?
多源融合的一个典型问题是数据重复。同一份文档可能同时在本地文件系统和 Notion 中存在,同一个问题的解答可能既在 Jira 的 Issue 中又在 Confluence 的Wiki 页面里。如果不做去重,检索结果中会出现大量重复内容,浪费 LLM 的上下文窗口并降低答案质量。
基于内容的去重
最直观的去重方式是基于文本内容的相似度判断:
python
from hashlib import md5
from difflib import SequenceMatcher
def content_hash(text: str) -> str:
"""生成文本的哈希值(用于精确去重)"""
return md5(text.encode()).hexdigest()
def is_similar(text1: str, text2: str, threshold: float = 0.85) -> bool:
"""判断两段文本是否相似(用于模糊去重)"""
return SequenceMatcher(None, text1, text2).ratio() >= threshold
def deduplicate_documents(
documents: list,
mode: str = "exact", # "exact" 或 "fuzzy"
threshold: float = 0.85,
) -> list:
"""对文档列表进行去重"""
seen = set()
unique = []
for doc in documents:
if mode == "exact":
h = content_hash(doc.text)
if h not in seen:
seen.add(h)
unique.append(doc)
elif mode == "fuzzy":
is_dup = False
for existing in unique:
if is_similar(doc.text, existing.text, threshold):
is_dup = True
break
if not is_dup:
unique.append(doc)
return unique
# 使用
original_count = len(all_documents)
unique_docs = deduplicate_documents(all_documents, mode="fuzzy")
removed = original_count - len(unique_docs)
print(f"去重: {original_count} → {len(unique_docs)} (移除 {removed} 个重复)")精确去重(mode="exact") 适用于完全相同的文档在不同来源中出现的情况(比如同一份 PDF 既在本地又在 S3 上)。它速度快(O(n) 复杂度)且不会误删。
模糊去重(mode="fuzzy") 适用于内容相似但表述不完全一致的情况(比如同一份文档在 Confluence 和 Notion 中可能有轻微的编辑差异)。代价是速度慢(O(n²) 复杂度,因为要两两比较)且可能误删真正不同的内容。阈值的选择很重要——太高则漏掉重复,太低则误删不同内容。
基于 ID 的去重
如果你的数据源都有可靠的全局唯一标识符(如文档 ID、URL、主键等),这是最高效的去重方式:
python
def deduplicate_by_id(documents: list, id_field: str = "id") -> list:
seen_ids = set()
unique = []
for doc in documents:
doc_id = doc.metadata.get(id_field)
if doc_id and doc_id not in seen_ids:
seen_ids.add(doc_id)
unique.append(doc)
elif not doc_id:
unique.append(doc) # 没有 ID 的保留
return unique这种方式的前提是你的各个数据源都能提供可靠的 ID。如果做不到,就退回到基于内容的去重。
冲突解决:同一事实有不同说法怎么办?
比重复更棘手的问题是冲突——关于同一个事实,不同数据源给出了不同的说法。比如:
- 产品手册 PDF 写着"保修期 24 个月"
- Notion 的最新政策写着"保修期调整为 12 个月(2025年1月1日起生效)"
- Slack 讨论中有人说"好像改成了18个月"
这时候该怎么办?
策略一:信任层级(Trust Hierarchy)
为每个数据源分配一个可信度等级,冲突时优先采信高等级来源:
python
SOURCE_TRUST_LEVELS = {
"notion_policy": 5, # 最高:官方政策文档
"local_file_pdf": 4, # 高:正式发布的文档
"database_postgres": 4, # 高:结构化主数据
"confluence": 3, # 中:团队 Wiki
"jira_issues": 2, # 低:历史记录
"slack_messages": 1, # 最低:非正式讨论
}
def resolve_conflict(source_nodes):
"""根据信任层级解决冲突"""
if len(source_nodes) <= 1:
return source_nodes
best = max(
source_nodes,
key=lambda n: SOURCE_TRUST_LEVELS.get(
n.metadata.get("source_type", "unknown"), 0
),
)
return [best]策略二:时间优先(Recency Wins)
对于时效性强的信息(如价格、政策、状态),采用"最新的为准"原则:
python
from datetime import datetime
def resolve_by_recency(source_nodes):
"""根据时间戳解决冲突,取最新的"""
def get_time(node):
ts_str = node.metadata.get("updated_at")
or node.metadata.get("created_at")
or node.metadata.get("timestamp")
if ts_str:
try:
return datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
except ValueError:
pass
return datetime.min
latest = max(source_nodes, key=get_time)
return [latest]策略三:交给 LLM 判断
最灵活但也最昂贵的方式是把冲突信息都提供给 LLM,让它来判断:
python
conflicting_info = "\n".join([
f"[{n.metadata.get('source_type')} - {n.metadata.get('updated_at', '未知时间')}]\n{n.text}"
for n in source_nodes
])
prompt = f"""以下是关于同一主题的不同来源的信息,其中可能存在矛盾。
请综合判断最准确的答案,并说明你的理由。
{conflicting_info}
"""实际项目中,策略一和策略二的组合使用最为常见——先用信任层级筛选,再用时间戳在同层级内排序。策略三只在关键决策场景中使用。
元数据标准化
不同数据源返回的 metadata 字段名和格式各不相同。Notion 返回的是 "Title"(首字母大写),Jira 返回的是 "summary"(小写),数据库返回的是 "title"(全小写)。如果不做标准化,后续的过滤和查询就会非常混乱。
python
def standardize_metadata(doc, source_type: str):
"""将不同来源的元数据标准化为统一的 schema"""
source = doc.metadata.get("source", "")
created = (
doc.metadata.get("created_at")
or doc.metadata.get("creation_date")
or doc.metadata.get("timestamp")
or ""
)
updated = (
doc.metadata.get("updated_at")
or doc.metadata.get("last_modified")
or doc.metadata.get("modification_date")
or created
)
title = (
doc.metadata.get("title")
or doc.metadata.get("Title")
or doc.metadata.get("summary")
or doc.metadata.get("name")
or doc.metadata.get("file_name")
or "Untitled"
)
author = (
doc.metadata.get("author")
or doc.metadata.get("owner")
or doc.metadata.get("assignee")
or doc.metadata.get("created_by")
or "Unknown"
)
doc.metadata.update({
"_std_source_type": source_type,
"_std_source_url": source,
"_std_title": title,
"_std_author": author,
"_std_created_at": created,
`_std_updated_at`: updated,
"_std_original_metadata": doc.metadata.copy(), # 保留原始 metadata
})
return doc
# 批量标准化
standardized_docs = [
standardize_metadata(doc, doc.metadata.get("source_type", "unknown"))
for doc in all_documents
]以 _std_ 前缀命名的标准化字段保证了无论原始数据来自哪里,你都可以用统一的字段名来做过滤和分析。同时 _std_original_metadata 保留了完整的原始 metadata,以防需要回溯。
按来源加权检索
不同来源的数据质量不同,也许你希望来自官方文档的结果排名更高,而 Slack 消息的排名适当降低。这可以通过在 Node 层面添加权重来实现:
python
from llama_index.core import Document
from llama_index.core.schema import TextNode
SOURCE_WEIGHTS = {
"notion_policy": 1.5, # 官方政策:提升权重
"local_file_pdf": 1.3, # 正式文档:略微提升
"database_postgres": 1.2, # 主数据:略微提升
"confluence": 1.0, # Wiki:正常权重
"jira_issues": 0.9, # Issue:略微降低
"slack_messages": 0.7, # Slack:降低权重
}
def apply_source_weights(documents):
"""为每个 Document 添加来源权重"""
weighted_nodes = []
for doc in documents:
source_type = doc.metadata.get("source_type", "unknown")
weight = SOURCE_WEIGHTS.get(source_type, 1.0)
node = TextNode(
text=doc.text,
metadata={
**doc.metadata,
"_source_weight": weight,
},
)
weighted_nodes.append(node)
return weighted_nodes
weighted_nodes = apply_source_weights(standardized_docs)
index = VectorStoreIndex(nodes=weighted_nodes)权重可以在后处理器中使用——比如在检索结果返回前,将相关性分数乘以来源权重来调整排名。不过需要注意,权重的调整应该在充分评估的基础上进行,而不是凭感觉设定。第八章会讲的评估体系可以帮助你找到最优的权重配置。
架构最佳实践
对于一个生产级别的多源 RAG 系统,推荐的架构如下:
┌──────────────────────────────────────────────┐
│ 数据采集调度器 (Scheduler) │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │定时触发 │ │事件触发 │ │手动触发 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ └──────────┼──────────┘ │
│ ▼ │
│ ┌───────────────────────────────────────┐ │
│ │ Source Connector Layer │ │
│ │ │ │
│ │ FileReader │ DBReader │ APIReader │ │
│ │ CloudReader│ CustomReader│ ... │ │
│ └───────────────────┬───────────────────┘ │
│ ▼ │
│ ┌───────────────────────────────────────┐ │
│ │ Data Processing Pipeline │ │
│ │ │ │
│ │ 1. 元数据标准化 │ │
│ │ 2. 内容清洗 │ │
│ │ 3. 去重 │ │
│ │ 4. 冲突检测与标记 │ │
│ │ 5. 来源加权 │ │
│ └───────────────────┬───────────────────┘ │
│ ▼ │
│ ┌───────────────────────────────────────┐ │
│ │ Unified Index (VectorStore) │ │
│ └───────────────────┬───────────────────┘ │
│ ▼ │
│ Query Engine → Response │
└──────────────────────────────────────────────┘这个架构的核心思想是关注点分离:数据采集、数据处理、索引构建、查询服务各自独立,通过标准化的 Document 格式作为中间协议进行通信。这样的好处是:
- 新增数据源只需在 Connector Layer 添加一个新的 Reader
- 数据处理逻辑的修改不影响数据采集
- 索引可以独立重建而不需要重新采集数据
- 每一层都可以独立扩展和替换
常见误区
误区一:"把所有数据源的数据全部加载到一个索引就好"。 这是最简单的做法但不一定是最好的。如果某些数据源的查询模式与其他数据源截然不同(比如结构化的商品数据 vs 非结构化的讨论帖),考虑为它们建立独立的索引并使用路由查询(第五章会讲 Router Query Engine),效果往往更好。
误区二:"去重越彻底越好"。 过度的去重(尤其是模糊去重阈值设得太低)可能会误删有价值的内容。"同一件事的两种不同表述"有时候恰好包含了互补的信息。宁可保留少量重复,也不要丢失独特信息。
误区三:"所有数据源同等重要"。 几乎不可能。官方文档的可信度天然高于闲聊消息,结构化数据的准确性天然高于自由文本。承认这种差异并通过权重或分层来体现它,会让你的 RAG 系统输出更可靠的结果。
误区四:"多源融合是一次性的工作"。 数据源会变化(新系统上线、旧系统退役)、数据分布会漂移(某些来源的质量随时间下降)、业务需求会演进(新的查询类型出现)。多源融合是一个持续运营的过程,需要定期的审查和调整。