跳转到内容

3.1 数据插入:单条、批量与 Partition

插入数据看似简单,但批量大小和 Partition 策略直接影响写入性能和搜索效率


这一节在讲什么?

上一章我们定义好了 Collection 的 Schema,现在要把数据插进去了。Milvus 的 insert() 方法用起来很简单——传入一个字典列表就行。但在生产环境中,"能插入"和"插得快"是两回事——批量大小怎么选、Partition 怎么用、flush 的时机怎么把握,这些细节直接影响写入性能和数据的可搜索性。这一节我们从最简单的插入开始,逐步深入到批量优化和 Partition 策略。


基本插入:insert()

Milvus 的 insert() 方法接受一个字典列表,每个字典代表一条数据:

python
from pymilvus import MilvusClient

client = MilvusClient(uri="http://localhost:19530")

# 单条插入
client.insert(
    collection_name="documents",
    data=[
        {
            "id": 1,
            "content": "Milvus is a distributed vector database",
            "source": "official_docs",
            "category": "tech",
            "embedding": [0.1] * 768,
            "metadata": {"author": "Zilliz", "version": "2.5"}
        }
    ]
)

# 批量插入——推荐!一次插入多条数据
documents = [
    {
        "id": i,
        "content": f"Document content {i}",
        "source": "wiki",
        "category": "tech" if i % 2 == 0 else "science",
        "embedding": [0.01 * (i % 100)] * 768,
        "metadata": {"page": i, "chapter": i // 10}
    }
    for i in range(2, 102)
]

client.insert(
    collection_name="documents",
    data=documents
)

insert() 的返回值是一个字典,包含 insert_count(成功插入的条数)和 ids(插入数据的 ID 列表):

python
result = client.insert(collection_name="documents", data=documents)
print(f"Inserted {result['insert_count']} records")
print(f"IDs: {result['ids'][:5]}...")  # 打印前 5 个 ID

批量插入的性能优化

Milvus 的写入流程是:应用 → 消息队列 → DataNode → 对象存储。每次 insert() 调用都会产生一次消息队列的写入和 DataNode 的消费。如果你逐条插入 10 万条数据,就会产生 10 万次消息队列交互——这非常低效。

批量大小的选择

python
import numpy as np

def batch_insert(client, collection_name, all_data, batch_size=1000):
    """分批插入数据,控制每批的大小"""
    total = len(all_data)
    for i in range(0, total, batch_size):
        batch = all_data[i:i + batch_size]
        result = client.insert(collection_name=collection_name, data=batch)
        print(f"Batch {i // batch_size + 1}: inserted {result['insert_count']} records")

    # 插入完成后 flush,确保数据可搜索
    client.flush(collection_name)
    print(f"Done! Total {total} records inserted and flushed.")

# 生成 10 万条测试数据
all_data = [
    {
        "id": i,
        "content": f"Document {i}",
        "source": "test",
        "category": "tech",
        "embedding": np.random.rand(768).tolist(),
        "metadata": {"index": i}
    }
    for i in range(100000)
]

# 每批 5000 条插入
batch_insert(client, "documents", all_data, batch_size=5000)

批量大小建议:

数据规模建议批量大小原因
< 1 万全部一次插入数据量小,一次搞定
1 万 ~ 10 万1000~5000平衡内存和吞吐
10 万 ~ 100 万5000~10000充分利用消息队列的批处理能力
> 100 万10000配合并行写入进一步提升吞吐

常见误区:批量太大导致内存溢出

有些同学觉得批量越大越好,一次性插入 100 万条数据——但这可能导致客户端内存溢出(每条 768 维向量约 3KB,100 万条约 3GB),或者消息队列积压导致 DataNode 来不及消费。建议批量大小控制在 5000~10000 条,通过分批插入来平衡内存和吞吐。


Flush:让数据可搜索

Milvus 的写入是异步的——你调用 insert() 后,数据先进入消息队列,DataNode 在后台消费并写入对象存储。在数据被 DataNode 处理并加载到 QueryNode 之前,这些数据是"搜不到"的。flush() 的作用就是强制 DataNode 立即处理缓冲区中的数据,确保数据可搜索。

python
# 插入数据
client.insert(collection_name="documents", data=batch_data)

# flush 确保数据可搜索
client.flush(collection_name="documents")

# 现在搜索就能找到刚插入的数据了
results = client.search(
    collection_name="documents",
    data=[[0.1] * 768],
    limit=5
)

Milvus Lite 自动 flush

如果你用的是 Milvus Lite(本地文件模式),insert() 后数据会自动 flush,不需要手动调用。但 Milvus Standalone 和 Cluster 需要手动 flush 或等待自动同步(默认间隔约 1 秒)。

常见误区:每次 insert 后都 flush

频繁 flush 会严重影响写入性能——每次 flush 都会触发 DataNode 的 Segment 持久化操作,产生大量小 Segment。正确的做法是:攒够一批数据后 flush 一次,而不是每插入几条就 flush。

python
# 错误:每次 insert 后都 flush
for doc in documents:
    client.insert(collection_name="documents", data=[doc])
    client.flush("documents")  # ❌ 性能灾难!

# 正确:批量 insert 后 flush 一次
client.insert(collection_name="documents", data=documents)
client.flush("documents")  # ✅ 只 flush 一次

Partition:按维度物理切分数据

Partition 是 Milvus 中一个重要的数据组织方式——它把 Collection 按某个维度(时间、分类、租户)物理切分成多个分区,搜索时可以只扫描相关分区,减少计算量。

手动创建 Partition

python
# 创建分区
client.create_partition(collection_name="documents", partition_name="tech")
client.create_partition(collection_name="documents", partition_name="science")

# 插入数据时指定分区
client.insert(
    collection_name="documents",
    data=tech_documents,
    partition_name="tech"     # 数据进入 tech 分区
)

client.insert(
    collection_name="documents",
    data=science_documents,
    partition_name="science"  # 数据进入 science 分区
)

# 搜索时只搜索特定分区
results = client.search(
    collection_name="documents",
    data=[[0.1] * 768],
    limit=5,
    partition_names=["tech"]  # 只搜索 tech 分区
)

Partition Key:自动分区路由

手动指定分区在数据量大时很麻烦——你需要在应用层判断每条数据应该进入哪个分区。Milvus 2.2.9+ 提供了 Partition Key 功能——建表时指定某个字段为 Partition Key,Milvus 会根据字段值自动路由到对应分区:

python
from pymilvus import FieldSchema, CollectionSchema, DataType

fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="category", dtype=DataType.VARCHAR, max_length=64, is_partition_key=True),  # 分区键
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
]
schema = CollectionSchema(fields=fields)

client.create_collection(collection_name="documents", schema=schema)

# 插入时不需要指定分区——Milvus 根据 category 自动路由
client.insert(
    collection_name="documents",
    data=[
        {"category": "tech", "embedding": [0.1] * 768},
        {"category": "science", "embedding": [0.2] * 768},
    ]
)

# 搜索时按 category 过滤——Milvus 自动只搜索对应分区
results = client.search(
    collection_name="documents",
    data=[[0.1] * 768],
    limit=5,
    filter='category == "tech"'  # 自动路由到 tech 分区
)

Partition Key 的好处是零代码侵入——你的应用代码不需要知道分区的存在,Milvus 自动处理路由。但有一个限制:Partition Key 字段的基数(不同值的数量)决定了分区数——如果 category 只有 3 个值(tech/science/art),就只有 3 个分区,粒度太粗。Milvus 会根据 num_partitions 参数(默认 64)自动创建指定数量的分区,然后根据哈希值把数据分配到不同分区。

分区数量建议

分区数不是越多越好——每个分区都有元数据管理开销,分区太多会增加协调器的负担。建议每个 Collection 的分区数在 64~512 之间。如果你有 1000 个租户需要隔离,不要创建 1000 个分区——可以用 Partition Key + 哈希,把 1000 个租户映射到 64 个分区中。


常见误区:Partition 能加速向量搜索

Partition 减少的是扫描范围,不是距离计算加速。搜索时指定分区,Milvus 只在对应分区的数据中计算距离,而不是全量扫描——这确实能提升搜索速度,但提升幅度取决于分区内的数据量。如果每个分区有 100 万条数据,搜索速度跟在一个 100 万条的 Collection 中搜索差不多。Partition 的真正价值是数据隔离(多租户)和管理便利(按时间分区方便清理旧数据),而不是搜索加速。


小结

这一节我们覆盖了 Milvus 的数据插入:基本 insert() 用法、批量插入的性能优化(建议 5000~10000 条/批)、flush 的时机(批量插入后 flush 一次,不要频繁 flush)、以及 Partition 的两种模式(手动分区和 Partition Key 自动路由)。下一节我们要深入 Milvus 最核心的操作——向量搜索 search()。

基于 MIT 许可发布