跳转到内容

I/O 性能优化

I/O 格式性能对比

格式100万行写入100万行读取文件大小支持列裁剪
CSV基准(1x)基准(1x)100%
JSONL~0.8x~0.7x~80%
Parquet~3-5x~5-10x~23%
Feather~4x~8x~25%

Parquet 最佳实践

写入优化

python
import pandas as pd
import numpy as np

np.random.seed(42)
n = 1_000_000

df = pd.DataFrame({
    'model': np.random.choice(['GPT-4o', 'Claude', 'Llama', 'Qwen'], n),
    'task': np.random.choice(['chat', 'code', 'math'], n),
    'score': np.random.uniform(60, 98, n).round(2),
    'latency_ms': np.random.randint(100, 5000, n),
    'timestamp': pd.date_range('2025-01-01', periods=n, freq='s'),
})

df.to_parquet('/tmp/llm_data.parquet', engine='pyarrow', index=False)

df.to_parquet(
    '/tmp/llm_partitioned',
    partition_cols=['model'],
    engine='pyarrow',
    index=False,
)

读取优化:只读需要的列

python
import pandas as pd

subset = pd.read_parquet(
    '/tmp/llm_data.parquet',
    columns=['model', 'score', 'latency_ms']
)

print(f"读取 {len(subset)} 行, {len(subset.columns)} 列")
print(subset.head())

读取优化:分区过滤下推

python
import pandas as pd

gpt_only = pd.read_parquet(
    '/tmp/llm_partitioned',
    filters=[('model', '=', 'GPT-4o')],
    columns=['task', 'score', 'latency_ms'],
)
print(f"GPT-4o 数据: {len(gpt_only)} 行")

CSV 读取优化技巧

python
import pandas as pd
import numpy as np

n = 500_000
csv_file = '/tmp/large_llm.csv'

large_df = pd.DataFrame({
    'id': range(n),
    'model': np.random.choice(['GPT-4o', 'Claude', 'Llama'], n),
    'prompt': [f'测试数据{i}' for i in range(n)],
    'score': np.random.uniform(0.6, 0.99, n).round(4),
})
large_df.to_csv(csv_file, index=False)

dtypes = {
    'id': 'int32',
    'model': 'category',
    'score': 'float32',
}

start_opt = pd.read_csv(csv_file, dtype=dtypes, usecols=['id', 'model', 'score'])

sample = pd.read_csv(csv_file, nrows=1000)
print(f"采样: {len(sample)} 行")

chunk_results = []
for chunk in pd.read_csv(csv_file, chunksize=50_000):
    chunk_summary = chunk.groupby('model')['score'].mean()
    chunk_results.append(chunk_summary)

combined = pd.concat(chunk_results)
final = combined.groupby(combined.index).mean()
print(f"\n各模型平均分 (分块聚合):")
print(final.round(4))

LLM 场景:大规模语料加载策略

python
import pandas as pd
import os

class EfficientLoader:
    """高效数据加载器"""

    @staticmethod
    def load_sft_corpus(path_pattern, columns=None, sample_ratio=1.0):
        """高效加载 SFT 语料"""
        import glob

        files = sorted(glob.glob(path_pattern))
        if not files:
            raise FileNotFoundError(f"未找到匹配文件: {path_pattern}")

        print(f"找到 {len(files)} 个文件")

        all_dfs = []
        for f in files:
            file_size_mb = os.path.getsize(f) / 1024 / 1024
            ext = f.split('.')[-1].lower()

            if ext == 'parquet':
                df = pd.read_parquet(f, columns=columns)
            elif ext == 'jsonl':
                df = pd.read_json(f, lines=True, dtype={'instruction': 'string'})
            elif ext == 'csv':
                df = pd.read_csv(f, usecols=columns)
            else:
                continue

            df['_source_file'] = os.path.basename(f)
            all_dfs.append(df)
            print(f"  ✓ {os.path.basename(f)}: "
                  f"{file_size_mb:.1f} MB, {len(df):,} 行")

        combined = pd.concat(all_dfs, ignore_index=True)

        if sample_ratio < 1.0:
            n_sample = int(len(combined) * sample_ratio)
            combined = combined.sample(n_sample, random_state=42)

        total_mem = combined.memory_usage(deep=True).sum() / 1024 / 1024
        print(f"\n总计: {len(combined):,} 行 × {len(combined.columns)} 列")
        print(f"内存占用: {total_mem:.2f} MB")

        return combined


loader = EfficientLoader()
print("=== 高效加载策略 ===")

基于 MIT 许可发布