最近在学detr,顺便学了一下多卡训模型,它的源码写的非常不错!
我自己在detr的代码的基础上实现了一个vae的训练,在mnist数据集上,4张2080上,batch size开到1024,训练快到飞起。
总结一下多卡训练的流程,以及自己遇到的一些坑,一方面方便自己之后回顾,另一方面能帮到大家最好。
可以分几个文件放置不同功能,首先是main.py。
第一步是初始化,初始化各个gpu,每个gpu在用的时候会有编号,这样代码才知道我在那个gpu上跑的。
其中涉及到一些概念,比如说world_size,node,rank,local_rank等。
简单解释一下world_size就是进程总数,因为每个进程对应着一个gpu,所以world_size就是gpu的总数,像我现在的配置,一个机器四张卡,那我的world_size就是4。
node就是计算节点的数目,像我只有一台机子,node那就是1。
rank和local_rank是gpu的编号,对于只有一个机子的情况,两个是一样的,第一个gpu的rank和local_rank就是0,第二个gpu的rank和local_rank就是1...
更加具体的内容可以看这里local_rank,rank,node等理解_rank world size-CSDN博客。
代码的化就是这一行
init_distributed_mode(config)
config是啥先不用管,它就是一个字典,里面是使用argparser产生的参数字典。这个函数的作用就是给这个进程指定它的rank,local_rank等信息,我理解是你用
python -m torch.distributed.launch --nproc_per_node=8 --use_env main.py
启动多卡时,这个main.py会在每个卡上都执行,每个卡分配一个进程,也就是我如果有四张gpu的话,那这个main.py就会执行四遍,我也不知道理解的对不对,欢迎佬们批评指正。
下面几个是关于并行的辅助函数,放在了utils.py中
`init_distributed_mode`长下面这个样子,按道理说这个函数抄了用就行,我还是稍微解释一下。
def init_distributed_mode(config):
if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:
config.rank = int(os.environ["RANK"])
config.world_size = int(os.environ['WORLD_SIZE'])
config.gpu = int(os.environ['LOCAL_RANK'])
elif 'SLURM_PROCID' in os.environ:
config.rank = int(os.environ['SLURM_PROCID'])
config.gpu = config.rank % torch.cuda.device_count()
else:
print('Not using distributed mode')
config.distributed = False
return
config.distributed = True
torch.cuda.set_device(config.gpu)
config.dist_backend = 'nccl'
print('| distributed init (rank {}): {}'.format(
config.rank, config.dist_url), flush=True)
torch.distributed.init_process_group(backend=config.dist_backend, init_method=config.dist_url,
world_size=config.world_size, rank=config.rank)
torch.distributed.barrier()
setup_for_distributed(config.rank == 0)
当用上面的那个多卡启动命令的时候,环境变量里面就会自己产生RANK,WORLD_SIZE,LOCAL_RANK等,没启动的时候你去查会发现没有,我认为是pytorch自己内部做了啥动作。这个时候每个gpu的RANK,LOCAL_RANK会不一样,那么每个进程就可以知道自己是哪个gpu了。把这些RANK,WORLD_SIZE,LOCAL_RANK加到config这字典里,后面就可以用了。
下面那些命令我觉得抄上就行了。
其中值的说明一下的是
torch.cuda.set_device(config.gpu)
这个命令应该就是设置你的gpu是config.gpu这个,等你用`model.cuda()`或者`model.to('cuda')`的时候,不会默认给你搬到第一张gpu上,而是搬到config.gpu这张上去。参考:torch.device和torch.cuda.set_device()_torch.device('cuda:0')-CSDN博客
torch.distributed.barrier()
这个说的是,如果你第一张gpu准备好了,而其他的gpu还没准备好,那就等在这,等其他都完成,,起一个同步的作用。
setup_for_distributed(config.rank == 0)
这个是关掉其他卡上的打印操作,旨在rank=0,也就是第一张gpu的进程上显示打印信息,不然你控制台就太难看了,好多重复信息。这个函数也是你自己写的,抄上就行,具体我也看不懂。
这个函数长这样
def setup_for_distributed(is_master):
"""
This function disables printing when not in master process
"""
import builtins as __builtin__
builtin_print = __builtin__.print
def print(*args, **kwargs):
force = kwargs.pop('force', False)
if is_master or force:
builtin_print(*args, **kwargs)
__builtin__.print = print
ok初始化到这就完成了。
接着回到main.py中。
然后是设置随机种子,这个是必须设的,要保证可复现和不同gpu上一样的随机,否则模型就不一样了,你按照不同的seed初始化模型参数,不同gpu上模型初始化参数不一样,这样是不行的。
seed = config.seed + get_rank()
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
这个地方我有点迷,detr里面的seed为啥是我设置的seed加上我当前进程的rank?那我不同gpu上的seed不就不一样了?比如config.seed=42,那gpu0-3的seed分别是42,43,44,45,这样我训练的时候能保证模型一模一样吗?不是说DistributedDataParallel不同gpu上模型是一模一样的吗,只是数据分布到不同gpu上?我不太懂,希望有明白的佬提点一下。
这里的`get_rank`也非常简单,
def is_dist_avail_and_initialized():
if not dist.is_available():
return False
if not dist.is_initialized():
return False
return True
def get_rank():
if not is_dist_avail_and_initialized():
return 0
return dist.get_rank()
`dist`是`import torch.distributed as dist` 。
接着就是定义模型,然后把模型搬到device上去,这里的config.deice='cuda'。
model = VAE(config.in_channels, config.latent_dim, config.hidden_dim)
model.to(device)
接着就是把模型变成DDP的模型,我认为封装后的model里面会有一些同步参数的行为,但是我们用户不太需要了解。
model_without_ddp = model
if config.distributed:
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[config.gpu])
model_without_ddp = model.module
这里的`model.module`是把封装好的类里面的模型取出来,因为封装后的DDP模型其实变了好多,model.parameters等方法都没了,没法直接操作模型,而`model.moudle`的作用就是把原来的model取出来,这个model_without_ddp就可以方便我们导入预训练参数,冻结参数等等操作了。
然后就是定义优化器和学习率策略,
optimizer = optim.Adam(model_without_ddp.parameters(), lr=config.LR, weight_decay=config.weight_decay)
lr_scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=config.scheduler_gamma)
接着是数据的导入,和data_loader的构建,这地方是和原始单gpu有差异的地方之一。
transform=transforms.Compose([
transforms.ToTensor()
])
train_dataset = MNIST(config.data_path, train=True, download=True, transform=transform)
val_dataset = MNIST(config.data_path, train=False, download=True, transform=transform)
if config.distributed:
train_sampler = DistributedSampler(train_dataset)
val_sampler = DistributedSampler(val_dataset, shuffle=False)
else:
train_sampler = RandomSampler(train_dataset)
val_sampler = SequentialSampler(val_dataset)
batch_train_sampler = BatchSampler(train_sampler, config.batch_size, drop_last=True)
batch_val_sampler = BatchSampler(val_sampler, config.batch_size, drop_last=False)
train_loader = DataLoader(train_dataset,
batch_sampler=batch_train_sampler,
num_workers=config.num_workers,
collate_fn=collate_fn)
val_loader = DataLoader(val_dataset,
batch_sampler=batch_val_sampler,
drop_last=False,
num_workers=config.num_workers,
collate_fn=collate_fn)
第一个是torch.utils.data.DistributedSampler,这函数的就是给不同的gpu分配不通的数据,简单来说就是把编号1,3,5,...的数据分给gpu0,2,4,8,...分给gpu2等等,这一步只是把索引给分好。
第二个BatchSampler就是在把数据进行batch化。
Sampler本质就是个数据分配的东西,指定哪个数据到哪个batch,哪个设备上取训练。
现在就可以开始训练了,第一个有一步是
for epoch in tqdm(range(config.epochs)):
if config.distributed:
train_sampler.set_epoch(epoch)
`set_epoch`是必须的用于启动DataLoader的shuffle功能,否则数据集还是顺序的,sampler没用,每个epoch的数据顺序一样,参考https://blog.csdn.net/YoJayC/article/details/121532525。官方的教程里是这么说的,
后面就没啥了,训练更新参数和学习率
train_stats = train_one_epoch(model, train_loader, optimizer, **config)
lr_scheduler.step()
`train_one_epoch`和单卡写法一样,但是里面有个地方,也是非常重要的一个点,就是怎么同步结果,比如loss怎么处理。因为我这个gpu进程得到的是这个进程上数据的loss,比如我有四张卡,第一张卡上batch size有64个数据,会产生一个loss,但是四张卡按理来说是256的batch size,我们的做法应该是把四张卡的loss加起来除以四。当然loss的backward还是跟原来一样,还是在64的batch size上去做哈,只是我们记录的时候呢要把四张卡的loss加起来,在rank=0的卡上记录。
具体做法依赖于一个函数就是`dist.all_reduce`,用于同步的函数。以下面这个函数为例
def get_world_size():
if not is_dist_avail_and_initialized():
return 1
return dist.get_world_size()
def reduce_dict(input_dict, average=True):
"""
Args:
input_dict (dict): all the values will be reduced
average (bool): whether to do average or sum
Reduce the values in the dictionary from all processes so that all processes
have the averaged results. Returns a dict with the same fields as
input_dict, after reduction.
"""
world_size = get_world_size()
if world_size < 2:
return input_dict
with torch.no_grad():
names = []
values = []
# sort the keys so that they are consistent across processes
for k in sorted(input_dict.keys()):
names.append(k)
values.append(input_dict[k])
values = torch.stack(values, dim=0)
dist.all_reduce(values)
if average:
values /= world_size
reduced_dict = {k: v for k, v in zip(names, values)}
return reduced_dict
我的这个`input_dict`其实是一个字典,里面每个值都是每个进程产生的结果,每个进程产生的结果都不一样,比如第一张卡是{"loss1":0,2, "loss2": 0.11, "loss3": 0.45},第二张卡是{"loss1":0,23, "loss2": 0.01, "loss3": 0.42}等等,做法是首先在每张卡上把loss1,loss2, loss3的内容stack起来成一个张量,比如第一张卡成为tensor([0.2, 0.11, 0.45]),第二张卡为tensor([0.23, 0.01, 0.42])等等,那么dist.all_reduce(values)就会把所有结果加起来,再除以我们的gpu数量,那就是256的batch size的loss了。
`train_one_epoch`代码如下。
import torch
from utils import *
import os
import sys
import math
def train_one_epoch(model:torch.nn.Module,
data_loader: torch.utils.data.DataLoader,
optimizer: torch.optim.Optimizer,
**kwargs):
model.train()
loss_lst = []
recons_loss_lst = []
kld_loss_lst = []
for sample_batch in data_loader:
sample_batch.to(kwargs["device"])
results = model(sample_batch)
train_stats = model.module.loss_function(results, **kwargs)
loss = train_stats['loss']
# reduce losses over all GPUs for logging purposes
train_stats_reduced = reduce_dict(train_stats)
loss_reduced = train_stats_reduced["loss"].item()
loss_lst.append(loss_reduced)
recons_loss_lst.append(train_stats_reduced["Reconstruction_loss"].item())
kld_loss_lst.append(train_stats_reduced["KLD"].item())
if not math.isfinite(loss_reduced):
print("Loss is {}, stopping training".format(loss_reduced))
print(train_stats_reduced)
sys.exit(1)
optimizer.zero_grad()
loss.backward()
if kwargs['clip_max_norm'] > 0:
torch.nn.utils.clip_grad_norm_(model.parameters(), kwargs['clip_max_norm'])
optimizer.step()
return {'loss': sum(loss_lst)/len(loss_lst), "Reconstruction_loss": sum(recons_loss_lst)/len(recons_loss_lst), "KLD": sum(kld_loss_lst)/len(kld_loss_lst)}
注意一个地方就是`sample_batch.to(kwargs["device"])`,kwargs["device"]是我们config里面的device设置为‘cuda’没有指定那一张卡,它会默认搬到第一步初始化的`torch.cuda.set_device(config.gpu)`上去。
最后是保存模型,要在第一张卡上去保存,用函数`is_main_process`判断一下,保存就行了。
def is_main_process():
return get_rank() == 0
def save_on_master(*args, **kwargs):
if is_main_process():
torch.save(*args, **kwargs)
save_on_master(
{
"model": model_without_ddp.state_dict(),
"optimizer": optimizer.state_dict(),
"lr_scheduler": lr_scheduler.state_dict(),
"epoch": epoch,
"config": config,
},
checkpoint_path)
最后在终端里面输入
python -m torch.distributed.launch --nproc_per_node=4 --use_env main.py --xxxparameters
其中`nproc_per_node`的意思是每张机子上有几张卡,--use_env是第一步中初始化的时候将RANK,LOCAL_RANK等添加到os.environ中的意思。
代码在https://github.com/JHW5981/MISC/tree/main/vae可见。