【深度学习】单机多卡 | DataParallel将计算任务在多个 GPU 上并行执行,可以在多个 GPU 上分摊工作负载,从而加快训练速度
- 写在最前面
- DataParallel (DP) 简介
- 使用 DataParallel 的场景
- 使用 DataParallel 的基本步骤
- 代码部分
- train.py
- 简单的代码示例
- 代码解析
- DataParallel 的局限性
- 小结
写在最前面
希望在单机多卡的模式下运行我的模型代码,加快训练速度。
请教吕博:如何更改代码?
其中,提到模型先用DP方式运行
DP是什么?又被学到了一个知识点。
在深度学习和分布式计算领域,DP 通常指的是 DataParallel
。DataParallel
是一种将计算任务在多个 GPU 上并行执行的方法。它在单机多卡环境中非常有用,可以在多个 GPU 上分摊工作负载,从而加快训练速度。
DataParallel (DP) 简介
torch.nn.DataParallel
是 PyTorch 中的一个工具,可以让模型在多个 GPU 上并行运行。它通过将输入批次拆分成多个子批次,每个子批次发送到不同的 GPU 上,并行执行前向传播和反向传播,然后将每个 GPU 上的梯度聚合到主 GPU 上进行参数更新。
使用 DataParallel 的场景
- 单机多卡训练: 当你有一台机器配备了多块 GPU,并希望利用所有的 GPU 资源来加速模型训练时,
DataParallel
是一个简单而有效的解决方案。 - 简化代码: 相比于更复杂的分布式训练方案,
DataParallel
提供了一种较为简化的方式来实现多 GPU 并行训练,通常只需要对模型进行简单包装。
使用 DataParallel 的基本步骤
- 定义模型: 创建你的神经网络模型。
- 包装模型: 使用
torch.nn.DataParallel
包装你的模型。 - 将模型和数据迁移到 GPU: 使用
.to(device)
将模型和输入数据迁移到合适的设备上。 - 训练模型: 按照常规方式训练模型。
代码部分
train.py
仅展示相关部分
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
# import部分省略
def evaluate(model, device, dataloader):
model.eval()
total_loss, total_step = 0.0, 0.0
# 使用with torch.no_grad()来禁用梯度计算
with torch.no_grad():
# 对dataloader中的每个batch进行遍历
for step, batch in enumerate(dataloader):
# 将batch中的数据移动到指定设备上
batch = tuple(t.to(device) for t in batch)
input_ids, attention_mask, decoder_input_ids, decoder_attention_mask, labels = batch
# 通过模型进行前向传播,并获取输出结果
outputs = model(input_ids, attention_mask=attention_mask, decoder_input_ids=decoder_input_ids,
decoder_attention_mask=decoder_attention_mask, labels=labels)
# 获取模型输出结果中的loss值
loss = outputs['loss']
# 累加总损失和总步数
total_loss += loss.item()
total_step += 1
# 返回总损失和None
return total_loss / total_step, None
# 设置随机数种子、日志路径
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 初始化tokenizer、添加特殊的tokens
# define dataloader 定义数据加载器
# 每批的个数4/梯度累计个数4
batch_size = int(args.batch_size / args.gradient_accumulation_steps)
# from processing.dataset import BartDataset
# return input_ids, attention_mask, decoder_input_ids, decoder_attention_mask, labels
trian_dataset = BartDataset(tokenizer, args, mode='train')
train_dataloader = DataLoader(
dataset=trian_dataset,
batch_size=batch_size,
shuffle=True,
collate_fn=trian_dataset.collate_fn,
num_workers=20 # 优化数据加载
)
eval_dataset = BartDataset(tokenizer, args, mode='test')
eval_dataloader = DataLoader(
dataset=eval_dataset,
batch_size=batch_size,
shuffle=False,
collate_fn=eval_dataset.collate_fn,
num_workers=20 # 优化数据加载
)
# define model 实例化模型
# 检查GPU数量并设置DataParallel
if torch.cuda.device_count() > 1:
print(f"Using {torch.cuda.device_count()} GPUs")
net = nn.DataParallel(model)
else:
print("Using single GPU or CPU")
net = model
# define criterion 定义损失函数
# define optimizer优化器
# 参考:https://blog.csdn.net/hottie_xiaomiao/article/details/124392847
# 打印每一次迭代元素的名字和param
param_optimizer = list(model.named_parameters())
no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight']
# 参数组:每组参数可以指定自己的优化器参数,即可使用不同的优化策略
optimizer_grouped_parameters = [
{'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},
{'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
]
optimizer = torch.optim.AdamW(optimizer_grouped_parameters, lr=args.bart_lr)
# 计算总步数
total_steps = int(len(trian_dataset) * args.epochs / args.gradient_accumulation_steps)
# 初始化学习率调整器
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=int(args.warmup_steps * total_steps),
num_training_steps=total_steps)
# Begin training
# logger部分省略
# 定义一个初始的最好评估损失,即正无穷大
best_eval_loss = float('inf')
# 定义当前的步骤和当前损失
current_step, current_loss = 0, 0
# 定义全局步骤数
global_step = 0
# 对于每个 epoch 进行循环
for epoch in range(args.epochs):
# 将模型设置为训练模式
model.train()
# 对训练数据集进行循环
for step, batch in enumerate(train_dataloader):
# 将batch中的每一个tensor都移动到指定的设备上(如GPU)
batch = tuple(t.to(device) for t in batch)
# 从batch中获取输入,注意这里的命名方式要和模型中forward函数中的输入命名相同
input_ids, attention_mask, decoder_input_ids, decoder_attention_mask, labels = batch
# 将输入传递给模型进行前向计算
outputs = model(input_ids, attention_mask=attention_mask, decoder_input_ids=decoder_input_ids,
decoder_attention_mask=decoder_attention_mask, labels=labels)
# TODO:1
# print(outputs)
# 获取模型的损失
loss = outputs['loss']
# 记录当前损失和步骤数,用于计算平均损失
current_loss += loss.item()
current_step += 1
# 如果使用了梯度累积,则将损失除以累积步骤数
if args.gradient_accumulation_steps > 1:
loss = loss / args.gradient_accumulation_steps
# 反向传播计算梯度
loss.backward()
# 将梯度进行裁剪,以防止梯度爆炸
clip_grad_norm_(model.parameters(), args.max_clip_norm)
# 如果达到了梯度累积的步骤数,则进行一次优化更新
if (step + 1) % args.gradient_accumulation_steps == 0:
optimizer.step()
scheduler.step()
optimizer.zero_grad()
global_step += 1
# 如果当前步骤是一个log间隔的倍数,则打印日志信息,清空当前步骤和当前损失
# 在训练完一个 epoch 后,对模型在验证集上进行评估
eval_loss, _ = evaluate(model, device, eval_dataloader)
logger.info("Eval loss: {:.6f}, the best loss: {:.6f}".format(eval_loss, best_eval_loss))
# 如果当前的评估损失比之前的最好评估损失更小,则更新最好评估损失
if eval_loss < best_eval_loss:
best_eval_loss = eval_loss
# 创建一个输出目录,用于存储模型的输出
# 在日志中输出检查点保存路径,将模型、tokenizer、args的设置保存到检查点路径中
简单的代码示例
以下是使用 DataParallel
在多 GPU 上运行模型的一个简单示例:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
# 设置环境变量,指定使用的GPU
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,6,7"
# 定义设备
globalDevice = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 定义一个简单的CNN模型
class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__()
self.conv = nn.Conv2d(3, 16, 3, 1)
self.fc = nn.Linear(16 * 26 * 26, 10)
def forward(self, x):
x = self.conv(x)
x = torch.relu(x)
x = x.view(x.size(0), -1)
x = self.fc(x)
return x
# 实例化模型
cnn = CNN().to(globalDevice)
# 检查GPU数量并设置DataParallel
if torch.cuda.device_count() > 1:
print(f"Using {torch.cuda.device_count()} GPUs")
net = nn.DataParallel(cnn)
else:
print("Using single GPU or CPU")
net = cnn
# 定义数据集和数据加载器
class SimpleDataset(Dataset):
def __init__(self, size):
self.size = size
def __len__(self):
return self.size
def __getitem__(self, idx):
return torch.randn(3, 28, 28), torch.tensor(1)
dataset = SimpleDataset(1000)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True, num_workers=20)
# 定义优化器和损失函数
optimizer = optim.SGD(net.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()
# 简单的训练过程
for epoch in range(args.epochs):
for inputs, labels in dataloader:
inputs, labels = inputs.to(globalDevice), labels.to(globalDevice)
optimizer.zero_grad()
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
print(f"Epoch {epoch+1}, Loss: {loss.item()}")
代码解析
-
环境变量设置:
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,6,7"
指定要使用的 GPU。
-
定义设备:
globalDevice = torch.device("cuda" if torch.cuda.is_available() else "cpu")
根据是否有可用的 GPU 设置设备。
-
定义模型:
class CNN(nn.Module): ...
-
包装模型:
if torch.cuda.device_count() > 1: net = nn.DataParallel(cnn) else: net = cnn
如果检测到多个 GPU,使用
DataParallel
包装模型。 -
数据加载器:
dataloader = DataLoader(dataset, batch_size=64, shuffle=True, num_workers=20)
使用
num_workers
参数优化数据加载。 -
训练过程:
for epoch in range(2): ...
DataParallel 的局限性
- 数据并行粒度:
DataParallel
进行的是数据并行操作,每个 GPU 处理一部分数据批次。这可能导致 GPU 利用率不均衡,尤其是在有计算负载差异的情况下。 - 单节点限制:
DataParallel
主要用于单节点多 GPU。如果需要跨节点并行(分布式训练),应该考虑使用torch.nn.parallel.DistributedDataParallel
。
小结
DataParallel
是 PyTorch 提供的一种简单易用的多 GPU 并行方法,适合单节点多卡训练。通过这种方法,可以在多个 GPU 上分摊计算任务,提高训练速度和效率。对于更复杂的分布式计算任务,可以考虑使用 DistributedDataParallel
。
欢迎大家添加好友交流。