Pytorch实现并行训练通常有两个接口:DP(DataParallel)和DDP(DistributedDataParallel)。目前DP(DataParallel)已经被Pytorch官方deprecate掉了,原因有二:1,DP(DataParallel)只支持单机多卡,无法支持多机多卡;2,DP(DataParallel)即便在单机多卡模式下效率也不及DDP(DistributedDataParallel)。我们可以看一下官方文档里的描述:
DistributedDataParallel is proven to be significantly faster than torch.nn.DataParallel for single-node multi-GPU data parallel training.
基于这两个缺点,DP(DataParallel)本文就不介绍了,即使DP(DataParallel)更好上手。本文重点讲解如何使用DDP(DistributedDataParallel)来完成并行训练。
官方文档:https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html
官方视频教程:https://pytorch.org/tutorials/beginner/ddp_series_intro.html
DDP
DDP是基于多进程来实现并行训练,每个GPU依靠独立的进程来驱动,进程之间有特殊的通信机制。先介绍几个变量:
变量名 | 值 | 意义 |
---|---|---|
rank | 0 | 在节点中的编号,比如当前是第0张GPU |
world_size | 4 | 整个节点数量,比如一共4张GPU |
我们先看一个单GPU代码,然后将其转换成DDP并行训练代码,观察变化来进行学习。
单GPU版本:(实验代码来自https://github.com/pytorch/examples/tree/main/distributed/ddp-tutorial-series)
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
class MyTrainDataset(Dataset):
def __init__(self, size):
self.size = size
self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]
def __len__(self):
return self.size
def __getitem__(self, index):
return self.data[index]
class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
optimizer: torch.optim.Optimizer,
gpu_id: int,
save_every: int,
) -> None:
self.gpu_id = gpu_id
self.model = model.to(gpu_id)
self.train_data = train_data
self.optimizer = optimizer
self.save_every = save_every
def _run_batch(self, source, targets):
self.optimizer.zero_grad()
output = self.model(source)
loss = F.cross_entropy(output, targets)
loss.backward()
self.optimizer.step()
def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
for source, targets in self.train_data:
source = source.to(self.gpu_id)
targets = targets.to(self.gpu_id)
self._run_batch(source, targets)
def _save_checkpoint(self, epoch):
ckp = self.model.state_dict()
PATH = "checkpoint.pt"
torch.save(ckp, PATH)
print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")
def train(self, max_epochs: int):
for epoch in range(max_epochs):
self._run_epoch(epoch)
if epoch % self.save_every == 0:
self._save_checkpoint(epoch)
def load_train_objs():
train_set = MyTrainDataset(2048) # load your dataset
model = torch.nn.Linear(20, 1) # load your model
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
return train_set, model, optimizer
def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=True
)
def main(device, total_epochs, save_every, batch_size):
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size)
trainer = Trainer(model, train_data, optimizer, device, save_every)
trainer.train(total_epochs)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='simple distributed training job')
parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
parser.add_argument('save_every', type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
args = parser.parse_args()
device = 0 # shorthand for cuda:0
main(device, args.total_epochs, args.save_every, args.batch_size)
直接复制上面代码,保存为single_gpu.py
,然后用以下命令就可以让代码运行:
python single_gpu.py 10000 1000
这一步如果运行失败,请留言告知。如果运行成功,我们观察GPU,会发现该程序只会调用第一张GPU。我们将上述代码稍微改进,就可以得到多GPU版本了:
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os
class MyTrainDataset(Dataset):
def __init__(self, size):
self.size = size
self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]
def __len__(self):
return self.size
def __getitem__(self, index):
return self.data[index]
def ddp_setup(rank, world_size):
"""
Args:
rank: Unique identifier of each process
world_size: Total number of processes
"""
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
init_process_group(backend="nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
optimizer: torch.optim.Optimizer,
gpu_id: int,
save_every: int,
) -> None:
self.gpu_id = gpu_id
self.model = model.to(gpu_id)
self.train_data = train_data
self.optimizer = optimizer
self.save_every = save_every
self.model = DDP(model, device_ids=[gpu_id])
def _run_batch(self, source, targets):
self.optimizer.zero_grad()
output = self.model(source)
loss = F.cross_entropy(output, targets)
loss.backward()
self.optimizer.step()
def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
self.train_data.sampler.set_epoch(epoch)
for source, targets in self.train_data:
source = source.to(self.gpu_id)
targets = targets.to(self.gpu_id)
self._run_batch(source, targets)
def _save_checkpoint(self, epoch):
ckp = self.model.module.state_dict()
PATH = "checkpoint.pt"
torch.save(ckp, PATH)
print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")
def train(self, max_epochs: int):
for epoch in range(max_epochs):
self._run_epoch(epoch)
if self.gpu_id == 0 and epoch % self.save_every == 0:
self._save_checkpoint(epoch)
def load_train_objs():
train_set = MyTrainDataset(2048) # load your dataset
model = torch.nn.Linear(20, 1) # load your model
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
return train_set, model, optimizer
def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=False,
sampler=DistributedSampler(dataset)
)
def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
ddp_setup(rank, world_size)
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size)
trainer = Trainer(model, train_data, optimizer, rank, save_every)
trainer.train(total_epochs)
destroy_process_group()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='simple distributed training job')
parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
parser.add_argument('save_every', type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
args = parser.parse_args()
world_size = torch.cuda.device_count()
mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)
将上述代码保存为mp_train.py
,然后直接运行:
python mp_train.py 10000 1000
我们再观察GPU,会发现所有GPU均被调用。到此,实验部分已经结束。下面进入分析阶段。
从单卡变为多卡,需要进行哪些修改?
我们可以简单比对以下差别:
vimdiff single_gpu.py mp_train.py
vimdiff
可以比较两个文档的差别,这是一个小trick,分享给大家。
- 第一步:初始化
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os
def ddp_setup(rank, world_size):
"""
Args:
rank: Unique identifier of each process
world_size: Total number of processes
"""
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
init_process_group(backend="nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
init_process_group
就是初始化群组,相当于给GPU们打了个招呼,我们要进行并行训练啦。这里的"os.environ"可以不用修改,因为只是单机多卡,不涉及多机多卡。"nccl"
表示Nvidia Collective Communications Library,是一个跨GPU的通信后端类型,一般也不用修改。这里需要注意的是rank
和world_size
,比如咱们是4卡机,rank的取值范围是[0,1,2,3],而world_size=4。
- 第二步:改造模型和数据加载器
左边是单卡模式,右边是多卡模式。 - 步骤三:用多进程启动
world_size = torch.cuda.device_count()
mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)
如有不解,欢迎留言讨论~