主题
7.1 DDP(DistributedDataParallel)基础
到目前为止,我们所有的训练都是在单张 GPU 上运行的。这在模型较小、数据量不大或者只是做快速实验时完全没问题——第 6 章我们甚至用 QLoRA 在单张 RTX 4090 上成功微调了 7B 模型。但当你面对以下场景时,单卡训练就不够用了:你想从头预训练一个 7B+ 的模型(需要数月时间)、你的数据集有几百 GB 需要更快的吞吐量、或者你需要在合理的时间内完成多轮超参数搜索。这时候就需要分布式训练——把计算任务分散到多张 GPU 甚至多台机器上并行执行。
分布式训练的核心思想简单来说就是:每张 GPU 负责处理数据的一部分,然后通过通信把结果同步起来。就像一个团队完成一个大项目——每个人负责一部分工作,定期开会同步进度,最终汇总成果。PyTorch 提供了几种不同的分布式策略,其中最基础也最广泛使用的是 DDP(DistributedDataParallel)。这一节我们将从 DDP 的工作原理开始,逐步学习它的正确使用方法、常见陷阱以及如何在 Lightning 和 HF Trainer 中启用它。
为什么需要分布式训练?
在深入技术细节之前,先理解一下为什么单卡训练会遇到瓶颈。假设你要在 Alpaca 数据集(约 52K 条指令数据)上微调一个 Qwen2.5-7B 模型:
瓶颈一:显存容量
7B BF16 模型本身需要 ~14GB,加上优化器状态 (~28GB)、梯度 (~14GB) 和激活值,全量微调的总需求超过 60GB。即使使用 LoRA + Gradient Checkpointing,一张 24GB 的 RTX 4090 也只能勉强塞下 batch_size=2~4 的配置。如果你想用更大的 batch size 来获得更稳定的梯度估计和更好的收敛效果,单卡就真的不够了。
瓶颈二:训练速度
即使显存够用,单卡的速度也可能不满足需求。假设你的 batch_size=4, seq_len=512, 使用 QLoRA 在 RTX 4090 上每个 step 大约需要 0.8 秒(包括前向、反向、优化器更新)。遍历一次 52K 条数据需要 52K / 4 = 13K 个 step,总时间 = 13K × 0.8s ≈ 2.9 小时 / epoch。3 个 epoch 就是 8.7 小时。如果你想做 10 组不同超参数的实验,那就是近 4 天的纯等待时间。
瓶颈三:数据规模
上面的例子用的是 52K 的 Alpaca 数据集。如果你的任务是大规模预训练,数据量可能是几十亿到几万亿 token 级别——在这种规模下单卡训练的时间成本是难以接受的。
分布式训练通过增加 GPU 数量来同时解决这三个问题:更多 GPU 意味着更大的总显存(可以增大 batch size)、更多的并行计算能力(线性加速)、以及更快的数据吞吐量。
DDP 工作原理
DistributedDataParallel 是 PyTorch 内置的数据并行方案。它的工作方式可以用以下几个步骤来描述:
初始化阶段:
┌───────────────────────────────────────────────────────┐
│ Rank 0 (GPU 0) Rank 1 (GPU 1) Rank 2 (GPU 2) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Model W │ │ Model W │ │ Model W │ │
│ │ (完整副本) │ │ (完整副本) │ │ (完整副本) │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ 各 rank 有: 完整相同的模型参数 │
│ 不同的数据子集 │
└───────────────────────────────────────────────────────┘
训练循环 (每个 step):
1. 每个 rank 从自己的 DataLoader 取一批数据
2. 前向传播 → 得到 loss
3. 反向传播 → 计算本地梯度
4. AllReduce: 所有 rank 交换梯度并取平均
5. 每个 rank 用平均后的梯度更新参数
6. 所有 rank 的参数保持同步关键点在于步骤 4:AllReduce 操作。当 Rank 0 完成反向传播后得到了自己那份数据产生的梯度 $g_0$,Rank 1 得到了 $g_1$,Rank 2 得到了 $g_2$。AllReduce 会把这些梯度收集起来、取平均值 $(g_0 + g_1 + g_2) / 3$,然后把平均值广播回所有 rank。这样每个 rank 都用同样的平均梯度来更新参数,确保所有 rank 上的模型始终保持一致。
从数学上看,DDP 等价于用更大的 batch size 做单卡训练——因为梯度是各 mini-batch 梯度的平均值,这和单卡上 batch_size=3×per_device_batch_size 的效果是一样的。区别在于 DDP 中每个 rank 只需存储 batch_size=per_device_batch_size 的激活值,而不是总 batch size 的激活值,所以显存占用不会随 GPU 数量线性增长。
手写 DDP 训练
让我们看看如何用原生 PyTorch API 实现 DDP 训练:
python
import os
import torch
import torch.distributed as dist
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
def setup_ddp():
"""初始化 DDP 进程组"""
dist.init_process_group(backend="nccl")
local_rank = int(os.environ.get("LOCAL_RANK", -1))
if local_rank == -1:
local_rank = int(os.environ.get("RANK", "0"))
torch.cuda.set_device(local_rank)
return local_rank, dist.get_world_size()
def cleanup_ddp():
"""清理 DDP 进程组"""
dist.destroy_process_group()
def ddp_training_loop(model_class, train_dataset, config):
local_rank, world_size = setup_ddp()
is_main = local_rank == 0
model = model_class(config).to(local_rank)
model = DDP(model, device_ids=[local_rank])
sampler = DistributedSampler(
train_dataset,
num_replicas=world_size,
rank=local_rank,
shuffle=True,
)
loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=config['batch_size'],
sampler=sampler,
num_workers=config['num_workers'],
pin_memory=True,
)
optimizer = torch.optim.AdamW(model.parameters(), lr=config['lr'])
for epoch in range(config['epochs']):
sampler.set_epoch(epoch)
model.train()
for step, batch in enumerate(loader):
input_ids = batch['input_ids'].to(local_rank)
labels = batch['labels'].to(local_rank)
output = model(input_ids, labels=labels)
loss = output['loss']
optimizer.zero_grad()
loss.backward()
if config['max_grad_norm'] > 0:
torch.nn.utils.clip_grad_norm_(
model.parameters(), config['max_grad_norm']
)
optimizer.step()
if is_main and step % 50 == 0:
print(f"[Rank {local_rank}] Epoch {epoch} "
f"Step {step}: loss={loss.item():.4f}")
cleanup_ddp()
if __name__ == "__main__":
ddp_training_loop(GPT, train_dataset, {
'batch_size': 8,
'lr': 3e-4,
'epochs': 10,
'max_grad_norm': 1.0,
'num_workers': 4,
})这段代码中有几个关键的 DDP 特有组件需要逐一解释:
dist.init_process_group(backend="nccl")
这是 DDP 的第一步——初始化进程组。backend 参数指定通信后端:
"nccl":NVIDIA 的 NCCL 库,GPU 间通信的标准选择(最快)"gloo":CPU 通信或跨平台场景"mpi":MPI 后端(较少使用)
通常情况下用 "nccl" 就对了。
DistributedSampler — 比 shuffle 更重要的东西
注意这里我们没有使用 DataLoader 的 shuffle=True 参数,而是改用了 DistributedSampler。原因如下:如果每个 rank 都独立 shuffle 自己的数据,那么不同 rank 看到的数据顺序会不一致,虽然这不影响正确性(因为 AllReduce 会平均梯度),但在某些情况下可能导致训练动态不稳定。更重要的是,DistributedSampler 保证了一个 epoch 内每个样本恰好被一个 rank 处理一次(没有重复也没有遗漏),而简单的 shuffle 无法保证这一点。
sampler.set_epoch(epoch) 这一行至关重要——如果不调用,每个 epoch 的数据划分会完全相同,相当于 shuffle 没有生效。
model = DDP(model, device_ids=[local_rank])
这行代码把普通模型包装为 DDP 模型。device_ids 告诉 DDP 这个 rank 应该使用哪张 GPU。包装后的模型在使用上和原模型几乎一样——你可以正常调用 model(input)、访问 .parameters() 等。DDP 会在 backward() 时自动触发 AllReduce 同步梯度。
启动方式:torchrun
DDP 程序不能像普通 Python 脚本那样直接运行——你需要用 torchrun(或 python -m torch.distributed.launch)来启动多个进程:
bash
# 单机多卡(最常见)
torchrun --nproc_per_node=4 train_ddp.py
# 多机多卡
torchrun \
--nproc_per_node=4 \
--nnodes=2 \
--node_rank=0 \
--master_addr="10.0.0.1" \
--master_port=29500 \
train_ddp.py
# 或者用 accelerate launch(HuggingFace 推荐)
accelerate launch --multi_gpu --num_processes 4 train_ddp.py--nproc_per_node=4 表示在本机上启动 4 个进程,每个进程绑定一张 GPU。torchrun 会自动设置环境变量(如 LOCAL_RANK、WORLD_SIZE、RANK),我们的代码通过 os.environ.get("LOCAL_RANK") 读取这些变量来确定当前进程的身份。
DDP 关键注意事项
注意一:Batch Size 是 per-device
这是一个极其常见的误区。当你设置 per_device_train_batch_size=4 且使用 4 张 GPU 时,有效 batch size 是 16(4 × 4),不是 4。这意味着:
- 学习率可能需要调整(线性缩放规则:总 batch size 翻倍时学习率也可以翻倍)
gradient_accumulation_steps也是 per-device 的- 日志中的 loss 是基于 per-device batch 的,不是全局 batch 的
注意二:Learning Rate 线性缩放
由于 DDP 的有效 batch size = per_device_batch_size × num_gpus,根据 GPT-3 论文的经验法则,当 batch size 增大时学习率也应该按比例增大:
$$\text{lr}{\text{ddp}} = \text{lr}{\text{base}} \times \frac{N_{\text{gpu}}}{N_{\text{base}}}$$
比如原来单卡 batch_size=4 用 lr=3e-4;现在 4 卡 batch_size=4(等效 total batch=16),建议 lr 调整为 3e-4 × 4 = 1.2e-3。不过实践中很多人选择保持 lr 不变(尤其是使用了 warmup 的情况下),因为 warmup 本身就缓解了大 learning rate 初期的不稳定性。
注意三:find_unused_parameters 默认行为
DDP 在第一次 forward 时会记录哪些参数参与了计算(即出现在计算图中)。如果在后续 forward 中出现了之前未见过的参数(某些条件分支导致的不同路径),DDP 会报错。对于 LoRA 微调这种场景,由于 LoRA 参数始终参与计算,一般不会有这个问题。但如果遇到了,可以通过 DDP(model, find_unused_parameters=True) 来放宽限制——代价是每次 forward 都要额外检查一遍参数状态,略微影响性能。
注意四:验证集不应该 shuffle
DDP 的 DistributedSampler 默认 shuffle=True。这对训练集是正确的,但对验证集和测试集来说应该关闭 shuffle 以保证评估结果的确定性:
python
train_sampler = DistributedSampler(train_dataset, shuffle=True)
val_sampler = DistributedSampler(val_dataset, shuffle=False)
train_loader = DataLoader(train_dataset, sampler=train_sampler, ...)
val_loader = DataLoader(val_dataset, sampler=val_sampler, ...)注意五:只有 Rank 0 做 I/O 和日志
在 DDP 中,所有 rank 执行的是相同的代码。如果你在每个 rank上都打印日志、保存 checkpoint 或写入文件,会产生大量重复的 I/O 操作。标准做法是用 is_main 标志来控制只有主进程执行这些操作:
python
is_main = local_rank == 0
if is_main:
print("Training started...")
logger = WandbLogger(...)
checkpoint_callback = ModelCheckpoint(...)
trainer = Trainer(..., callbacks=[checkpoint_callback] if is_main else [])Lightning 和 HF Trainer 都已经自动处理了这个细节——它们内部只在 rank 0 上执行日志记录和 checkpoint 保存。
注意六:Gradient Synchronization Point
DDP 的梯度同步发生在 backward() 返回之后、optimizer.step() 之前。这意味着:
- 如果你在 backward 之后、step 之前做了额外的操作(比如自定义的梯度处理),这些操作是在同步后的梯度上执行的
- 如果你需要在同步前修改梯度(比如某些特殊的梯度裁剪策略),需要使用
model.no_sync()上下文管理器来临时禁用同步:
python
with model.no_sync():
# 这里的 backward() 不会触发 AllReduce
loss.backward()
# 这里手动做一些操作...
custom_gradient_processing(model.parameters())
# 下面的 step() 会自动触发同步
optimizer.step()这种高级用法在某些特殊场景下有用(比如梯度累积中的部分同步),大多数时候不需要关心这个细节。
到这里,我们已经掌握了 DDP 的基本用法。DDP 是分布式训练的入门方案——它简单、稳定、在模型能放入单卡显存时效率很高。但当模型太大以至于单卡都放不下时(比如 70B FP16 = 140GB),DDP 就无能为力了,因为每张卡都需要完整的模型副本。下一节我们将学习 FSDP(Fully Sharded Data Parallel),它通过将模型参数分片到各卡上来突破单卡显存的限制。