跳转到内容

7.1 DDP(DistributedDataParallel)基础

到目前为止,我们所有的训练都是在单张 GPU 上运行的。这在模型较小、数据量不大或者只是做快速实验时完全没问题——第 6 章我们甚至用 QLoRA 在单张 RTX 4090 上成功微调了 7B 模型。但当你面对以下场景时,单卡训练就不够用了:你想从头预训练一个 7B+ 的模型(需要数月时间)、你的数据集有几百 GB 需要更快的吞吐量、或者你需要在合理的时间内完成多轮超参数搜索。这时候就需要分布式训练——把计算任务分散到多张 GPU 甚至多台机器上并行执行。

分布式训练的核心思想简单来说就是:每张 GPU 负责处理数据的一部分,然后通过通信把结果同步起来。就像一个团队完成一个大项目——每个人负责一部分工作,定期开会同步进度,最终汇总成果。PyTorch 提供了几种不同的分布式策略,其中最基础也最广泛使用的是 DDP(DistributedDataParallel)。这一节我们将从 DDP 的工作原理开始,逐步学习它的正确使用方法、常见陷阱以及如何在 Lightning 和 HF Trainer 中启用它。

为什么需要分布式训练?

在深入技术细节之前,先理解一下为什么单卡训练会遇到瓶颈。假设你要在 Alpaca 数据集(约 52K 条指令数据)上微调一个 Qwen2.5-7B 模型:

瓶颈一:显存容量

7B BF16 模型本身需要 ~14GB,加上优化器状态 (~28GB)、梯度 (~14GB) 和激活值,全量微调的总需求超过 60GB。即使使用 LoRA + Gradient Checkpointing,一张 24GB 的 RTX 4090 也只能勉强塞下 batch_size=2~4 的配置。如果你想用更大的 batch size 来获得更稳定的梯度估计和更好的收敛效果,单卡就真的不够了。

瓶颈二:训练速度

即使显存够用,单卡的速度也可能不满足需求。假设你的 batch_size=4, seq_len=512, 使用 QLoRA 在 RTX 4090 上每个 step 大约需要 0.8 秒(包括前向、反向、优化器更新)。遍历一次 52K 条数据需要 52K / 4 = 13K 个 step,总时间 = 13K × 0.8s ≈ 2.9 小时 / epoch。3 个 epoch 就是 8.7 小时。如果你想做 10 组不同超参数的实验,那就是近 4 天的纯等待时间。

瓶颈三:数据规模

上面的例子用的是 52K 的 Alpaca 数据集。如果你的任务是大规模预训练,数据量可能是几十亿到几万亿 token 级别——在这种规模下单卡训练的时间成本是难以接受的。

分布式训练通过增加 GPU 数量来同时解决这三个问题:更多 GPU 意味着更大的总显存(可以增大 batch size)、更多的并行计算能力(线性加速)、以及更快的数据吞吐量。

DDP 工作原理

DistributedDataParallel 是 PyTorch 内置的数据并行方案。它的工作方式可以用以下几个步骤来描述:

初始化阶段:
┌───────────────────────────────────────────────────────┐
│  Rank 0 (GPU 0)     Rank 1 (GPU 1)    Rank 2 (GPU 2)   │
│  ┌──────────┐      ┌──────────┐      ┌──────────┐       │
│  │ Model W  │      │ Model W  │      │ Model W  │       │
│  │ (完整副本) │      │ (完整副本) │      │ (完整副本) │      │
│  └──────────┘      └──────────┘      └──────────┘       │
│                                                       │
│  各 rank 有: 完整相同的模型参数                          │
│  不同的数据子集                                          │
└───────────────────────────────────────────────────────┘

训练循环 (每个 step):
  1. 每个 rank 从自己的 DataLoader 取一批数据
  2. 前向传播 → 得到 loss
  3. 反向传播 → 计算本地梯度
  4. AllReduce: 所有 rank 交换梯度并取平均
  5. 每个 rank 用平均后的梯度更新参数
  6. 所有 rank 的参数保持同步

关键点在于步骤 4:AllReduce 操作。当 Rank 0 完成反向传播后得到了自己那份数据产生的梯度 $g_0$,Rank 1 得到了 $g_1$,Rank 2 得到了 $g_2$。AllReduce 会把这些梯度收集起来、取平均值 $(g_0 + g_1 + g_2) / 3$,然后把平均值广播回所有 rank。这样每个 rank 都用同样的平均梯度来更新参数,确保所有 rank 上的模型始终保持一致。

从数学上看,DDP 等价于用更大的 batch size 做单卡训练——因为梯度是各 mini-batch 梯度的平均值,这和单卡上 batch_size=3×per_device_batch_size 的效果是一样的。区别在于 DDP 中每个 rank 只需存储 batch_size=per_device_batch_size 的激活值,而不是总 batch size 的激活值,所以显存占用不会随 GPU 数量线性增长。

手写 DDP 训练

让我们看看如何用原生 PyTorch API 实现 DDP 训练:

python
import os
import torch
import torch.distributed as dist
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler


def setup_ddp():
    """初始化 DDP 进程组"""
    dist.init_process_group(backend="nccl")
    
    local_rank = int(os.environ.get("LOCAL_RANK", -1))
    if local_rank == -1:
        local_rank = int(os.environ.get("RANK", "0"))
    
    torch.cuda.set_device(local_rank)
    
    return local_rank, dist.get_world_size()


def cleanup_ddp():
    """清理 DDP 进程组"""
    dist.destroy_process_group()


def ddp_training_loop(model_class, train_dataset, config):
    local_rank, world_size = setup_ddp()
    
    is_main = local_rank == 0
    
    model = model_class(config).to(local_rank)
    model = DDP(model, device_ids=[local_rank])
    
    sampler = DistributedSampler(
        train_dataset,
        num_replicas=world_size,
        rank=local_rank,
        shuffle=True,
    )
    
    loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=config['batch_size'],
        sampler=sampler,
        num_workers=config['num_workers'],
        pin_memory=True,
    )
    
    optimizer = torch.optim.AdamW(model.parameters(), lr=config['lr'])
    
    for epoch in range(config['epochs']):
        sampler.set_epoch(epoch)
        
        model.train()
        for step, batch in enumerate(loader):
            input_ids = batch['input_ids'].to(local_rank)
            labels = batch['labels'].to(local_rank)
            
            output = model(input_ids, labels=labels)
            loss = output['loss']
            
            optimizer.zero_grad()
            loss.backward()
            
            if config['max_grad_norm'] > 0:
                torch.nn.utils.clip_grad_norm_(
                    model.parameters(), config['max_grad_norm']
                )
            
            optimizer.step()
            
            if is_main and step % 50 == 0:
                print(f"[Rank {local_rank}] Epoch {epoch} "
                      f"Step {step}: loss={loss.item():.4f}")
    
    cleanup_ddp()


if __name__ == "__main__":
    ddp_training_loop(GPT, train_dataset, {
        'batch_size': 8,
        'lr': 3e-4,
        'epochs': 10,
        'max_grad_norm': 1.0,
        'num_workers': 4,
    })

这段代码中有几个关键的 DDP 特有组件需要逐一解释:

dist.init_process_group(backend="nccl")

这是 DDP 的第一步——初始化进程组。backend 参数指定通信后端:

  • "nccl":NVIDIA 的 NCCL 库,GPU 间通信的标准选择(最快)
  • "gloo":CPU 通信或跨平台场景
  • "mpi":MPI 后端(较少使用)

通常情况下用 "nccl" 就对了。

DistributedSampler — 比 shuffle 更重要的东西

注意这里我们没有使用 DataLoader 的 shuffle=True 参数,而是改用了 DistributedSampler。原因如下:如果每个 rank 都独立 shuffle 自己的数据,那么不同 rank 看到的数据顺序会不一致,虽然这不影响正确性(因为 AllReduce 会平均梯度),但在某些情况下可能导致训练动态不稳定。更重要的是,DistributedSampler 保证了一个 epoch 内每个样本恰好被一个 rank 处理一次(没有重复也没有遗漏),而简单的 shuffle 无法保证这一点。

sampler.set_epoch(epoch) 这一行至关重要——如果不调用,每个 epoch 的数据划分会完全相同,相当于 shuffle 没有生效。

model = DDP(model, device_ids=[local_rank])

这行代码把普通模型包装为 DDP 模型。device_ids 告诉 DDP 这个 rank 应该使用哪张 GPU。包装后的模型在使用上和原模型几乎一样——你可以正常调用 model(input)、访问 .parameters() 等。DDP 会在 backward() 时自动触发 AllReduce 同步梯度。

启动方式:torchrun

DDP 程序不能像普通 Python 脚本那样直接运行——你需要用 torchrun(或 python -m torch.distributed.launch)来启动多个进程:

bash
# 单机多卡(最常见)
torchrun --nproc_per_node=4 train_ddp.py

# 多机多卡
torchrun \
    --nproc_per_node=4 \
    --nnodes=2 \
    --node_rank=0 \
    --master_addr="10.0.0.1" \
    --master_port=29500 \
    train_ddp.py

# 或者用 accelerate launch(HuggingFace 推荐)
accelerate launch --multi_gpu --num_processes 4 train_ddp.py

--nproc_per_node=4 表示在本机上启动 4 个进程,每个进程绑定一张 GPU。torchrun 会自动设置环境变量(如 LOCAL_RANKWORLD_SIZERANK),我们的代码通过 os.environ.get("LOCAL_RANK") 读取这些变量来确定当前进程的身份。

DDP 关键注意事项

注意一:Batch Size 是 per-device

这是一个极其常见的误区。当你设置 per_device_train_batch_size=4 且使用 4 张 GPU 时,有效 batch size 是 16(4 × 4),不是 4。这意味着:

  • 学习率可能需要调整(线性缩放规则:总 batch size 翻倍时学习率也可以翻倍)
  • gradient_accumulation_steps 也是 per-device 的
  • 日志中的 loss 是基于 per-device batch 的,不是全局 batch 的

注意二:Learning Rate 线性缩放

由于 DDP 的有效 batch size = per_device_batch_size × num_gpus,根据 GPT-3 论文的经验法则,当 batch size 增大时学习率也应该按比例增大:

$$\text{lr}{\text{ddp}} = \text{lr}{\text{base}} \times \frac{N_{\text{gpu}}}{N_{\text{base}}}$$

比如原来单卡 batch_size=4 用 lr=3e-4;现在 4 卡 batch_size=4(等效 total batch=16),建议 lr 调整为 3e-4 × 4 = 1.2e-3。不过实践中很多人选择保持 lr 不变(尤其是使用了 warmup 的情况下),因为 warmup 本身就缓解了大 learning rate 初期的不稳定性。

注意三:find_unused_parameters 默认行为

DDP 在第一次 forward 时会记录哪些参数参与了计算(即出现在计算图中)。如果在后续 forward 中出现了之前未见过的参数(某些条件分支导致的不同路径),DDP 会报错。对于 LoRA 微调这种场景,由于 LoRA 参数始终参与计算,一般不会有这个问题。但如果遇到了,可以通过 DDP(model, find_unused_parameters=True) 来放宽限制——代价是每次 forward 都要额外检查一遍参数状态,略微影响性能。

注意四:验证集不应该 shuffle

DDP 的 DistributedSampler 默认 shuffle=True。这对训练集是正确的,但对验证集和测试集来说应该关闭 shuffle 以保证评估结果的确定性:

python
train_sampler = DistributedSampler(train_dataset, shuffle=True)
val_sampler = DistributedSampler(val_dataset, shuffle=False)

train_loader = DataLoader(train_dataset, sampler=train_sampler, ...)
val_loader = DataLoader(val_dataset, sampler=val_sampler, ...)

注意五:只有 Rank 0 做 I/O 和日志

在 DDP 中,所有 rank 执行的是相同的代码。如果你在每个 rank上都打印日志、保存 checkpoint 或写入文件,会产生大量重复的 I/O 操作。标准做法是用 is_main 标志来控制只有主进程执行这些操作:

python
is_main = local_rank == 0

if is_main:
    print("Training started...")
    logger = WandbLogger(...)
    checkpoint_callback = ModelCheckpoint(...)

trainer = Trainer(..., callbacks=[checkpoint_callback] if is_main else [])

Lightning 和 HF Trainer 都已经自动处理了这个细节——它们内部只在 rank 0 上执行日志记录和 checkpoint 保存。

注意六:Gradient Synchronization Point

DDP 的梯度同步发生在 backward() 返回之后、optimizer.step() 之前。这意味着:

  • 如果你在 backward 之后、step 之前做了额外的操作(比如自定义的梯度处理),这些操作是在同步后的梯度上执行的
  • 如果你需要在同步前修改梯度(比如某些特殊的梯度裁剪策略),需要使用 model.no_sync() 上下文管理器来临时禁用同步:
python
with model.no_sync():
    # 这里的 backward() 不会触发 AllReduce
    loss.backward()

# 这里手动做一些操作...
custom_gradient_processing(model.parameters())

# 下面的 step() 会自动触发同步
optimizer.step()

这种高级用法在某些特殊场景下有用(比如梯度累积中的部分同步),大多数时候不需要关心这个细节。

到这里,我们已经掌握了 DDP 的基本用法。DDP 是分布式训练的入门方案——它简单、稳定、在模型能放入单卡显存时效率很高。但当模型太大以至于单卡都放不下时(比如 70B FP16 = 140GB),DDP 就无能为力了,因为每张卡都需要完整的模型副本。下一节我们将学习 FSDP(Fully Sharded Data Parallel),它通过将模型参数分片到各卡上来突破单卡显存的限制。

基于 MIT 许可发布