最近在大趋势的影响下,开始染指大模型。由于实验室计算资源的限制,需要使用单机多卡并行的方式运行程序,这里以 BLOOM-560m 模型为例,演示如何通过单机多卡DDP并行的方式微调完成下游任务。
目录
- 0. 基础篇
- - 两种分布式训练方式
- - 数据并行 & 模型并行
- 1. 程序修改
- 1.1 导入关键包
- 1.2 定义关键函数
- 1.3 程序入口
- 1.4 main() 函数
- 1.5 get_dataloader() 函数
- 1.6 train() 函数
- 1.7 validate() 函数
- 1.8 test() 函数
- 2. 程序运行
- 2.1 mp.spawn() 启动
- 2.2 tochrnn 启动
- 2.3 torch.distributed.launch() 启动
- 3. Debug 历程
- 问题一:多进程计算数据收集
- 问题二:模型加载参数缺失
- 问题三:参数类型转换异常
- 问题四:参数泄露
0. 基础篇
- 两种分布式训练方式
⚠️:Pytorch 分布式目前只支持 Linux。实现程序并行主要有 DataParallel 和 DistributedDataParallel 两种方式:
-
DataParallel (DP)
:实现简单,代码量较少,启动速度快一点。但速度较慢,且存在负载不均衡的问题。单进程,多线程。主卡显存占用比其他卡会多很多。不支持 Apex 的混合精度训练。是Pytorch官方很久之前给的一种方案。受 Python GIL 的限制,DP的操作原理是将一个batchsize的输入数据均分到多个GPU上分别计算(此处注意,batchsize要大于GPU个数才能划分)。 -
DistributedDataParallel (DDP)
:All-Reduce模式,本意是用来分布式训练(多机多卡),但是也可用于单机多卡。配置稍复杂。多进程。数据分配较均衡。是新一代的多卡训练方法。使用 torch.distributed 库实现并行。torch.distributed 库提供分布式支持,包括 GPU 和 CPU 的分布式训练支持,该库提供了一种类似 MPI 的接口,用于跨多机器网络交换张量数据。它支持几种不同的后端和初始化方法。DDP通过Ring-Reduce的数据交换方法提高了通讯效率,并通过启动多个进程的方式减轻Python GIL的限制,从而提高训练速度。 -
DDP多卡训练的原理
- 将模型在各个GPU上复制一份;
- 将总的 batch 数据等分到不同的GPU上进行计算(shuffle 顺序打乱),每个进程都从磁盘加载其自己的数据;
- 在模型训练时,损失函数的前向传播和计算在每个 GPU 上独立执行,因此,不需要收集网络输出。在反向传播期间,各个进程通过一种叫 Ring-Reduce 的方法与其他进程通讯,交换各自的梯度,从而获得所有进程的平均梯度;然后用这个值在所有 GPU 上执行梯度下降,从而每个 GPU 在反向传播结束时最终得到平均梯度的相同副本;
- 各个进程用平均后的梯度更新自己的参数,因为各个进程的初始参数、更新梯度是一致的,所以更新后的参数也是完全相同的。
- 数据并行 & 模型并行
- 数据并行是指,多张 GPUs 使用相同的模型副本,但采用同一batch中的不同数据进行训练。
- 模型并行是指,多张 GPUs 使用同一 batch 的数据,分别训练模型的不同部分。
简单来记就是:并行就是对并行对象进行拆分,以提高运算效率。
1. 程序修改
本教程使用DDP 方式完成程序并行。参考此篇教程,实现模型多卡复制和数据并行。
1.1 导入关键包
以下是程序修改过程中会使用到的包;其中,dist 负责多卡通讯,DDP 负责模型传递等工作。
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.cuda.amp import GradScaler
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
1.2 定义关键函数
- init_ddp(local_rank)
对进程进行初始化,使用 nccl 后端,并用 env 作为初始化方法。
local_rank = dist.get_rank()
world_size = dist.get_world_size()
def init_ddp(local_rank):
# 有了这一句之后,在转换device的时候直接使用 a=a.cuda()即可,否则要用a=a.cuda(local_rank)
torch.cuda.set_device(local_rank)
os.environ['RANK'] = str(local_rank)
dist.init_process_group(backend='nccl', init_method='env://')
在完成了该初始化后,可以很轻松地在需要时获得 local_rank
、world_size
,而不需要作为额外参数从 main()
函数中一层一层往下传。比如需要 print
, log
, save_model
时,由于多个进程拥有相同的副本,故只需要一个进程执行即可,示例:
if local_rank == 0:
print(f'begin validating')
......
if local_rank == 0:
save_model(actual_epoch, model, scaler, args['model_save_dir'] + '/best_macro_model_DDP_direct.pt')
- reduce_tensor(tensor)
对多个进程的计算结果进行汇总,如 loss、评价指标。
def reduce_tensor(tensor: torch.Tensor):
'''
对多个进程计算的多个 tensor 类型的 输出值取平均操作
'''
rt = tensor.clone() # tensor(9.1429, device='cuda:1')
dist.all_reduce(rt, op=dist.reduce_op.SUM)
rt /= dist.get_world_size()
return rt
- get_ddp_generator(seed)
用于训练过程中,增强训练的随机性。
def get_ddp_generator(seed=3407):
'''
对每个进程使用不同的随机种子,增强训练的随机性
'''
local_rank = dist.get_rank()
g = torch.Generator()
g.manual_seed(seed + local_rank)
return g
1.3 程序入口
在if __name__ == '__main__':
中,使用 spawn()
函数启动 DDP,该函数的主要参数包括:
- fn:需要并行的函数。这里即为
main()
函数,每个线程将执行一次该函数; - args:fn所需的参数。注意:传给fn的参数必须写成元组的形式,哪怕只有一个参数;
- nprocs:启动的进程数,默认值为1. 这里将其设置为world_size即可。nprocs的值与world_size不一致会导致进程等待同步而一直停滞。
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-args', help="priority", type=bool, required=False, default=True)
parser.add_argument('-gpu', default='0,1', type=str, help='gpu device ids for CUDA_VISIBLE_DEVICES')
parser.add_argument('-mode', help="train&test", type=str, required=False, default='train')
parser.add_argument('-requires_grad', help="whether to weight_decay", type= bool, required=False, default=True)
args = parser.parse_args()
os.environ['MASTER_ADDR'] = 'localhost' # 0号机器的IP
os.environ['MASTER_PORT'] = '19198' # 0号机器的可用端口
os.environ['CUDA_VISIBLE_DEVICES'] = args['gpu'] # 使用哪些GPU
world_size = torch.cuda.device_count()
os.environ['WORLD_SIZE'] = str(world_size)
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:128"
os.environ["TOKENIZERS_PARALLELISM"] = "false" # 指定程序在分词时不并行执行
if args['mode'] == 'train':
time_start = time.time()
mp.spawn(fn=main, args=(args, ), nprocs=world_size)
time_elapsed = time.time() - time_start
print(f'\ntime elapsed: {time_elapsed:.2f} seconds.')
elif args['mode'] == 'test':
time_start = time.time()
mp.spawn(fn=test, args=(args, ), nprocs=world_size)
time_elapsed = time.time() - time_start
print(f'\ntime elapsed: {time_elapsed:.2f} seconds.')
1.4 main() 函数
这里的 main()
函数即上面提到的 spawn()
函数中传入的第一个参数。代码关键部位修改如下:
-
参数列表更新:添加额外参数
local_rank
,该参数无需在mp.spawn()
函数中传递,系统会自动分配; -
进程初始化:调用
init_ddp()
函数实现; -
BN层同步:调用
convert_sync_batchnorm()
函数用同步的方法完成BN以尽可能模拟单卡场景,尽管会降低GPU利用率,但可以提高模型在多卡场景下的表现(详解见此篇博客);BN层同步的必要性依赖于单卡batch_size的大小,如果单卡batch_size太小,使用SyncBN可以提高性能。但是如果batch_size较大的时候就不需要使用SyncBN, 因为这需要多卡之间通信,会导致训练速度变慢。 -
数据并行:调用
DistributedDataParallel()
函数实现; -
指定混合精度训练:调用
GradScaler()
函数实现,作为参数传至train()
函数中; -
训练采样器设置:每个 epoch 设置不同的 sampling 顺序;
-
避免副本重复执行:使用
if local_rank==0:
语句进行约束; -
消除进程组:调用
destroy_process_group()
函数实现。
def main(local_rank, args): # 参数列表更新
init_ddp(local_rank) ### 进程初始化
best_macro = 0
model, tokenizer = initialise_model(args['modelname'], args['num_labels'])
model.cuda()
model = nn.SyncBatchNorm.convert_sync_batchnorm(model) # BN层同步
num_gpus = torch.cuda.device_count()
if num_gpus > 1:
print('use {} gpus!'.format(num_gpus))
model = nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank) ### 套 DDP
num_training_steps = args['num_epochs'] * (args['num_samples'] // args['batch_size']) #总的训练步数
if args['requires_grad']: # 权重衰减
param_optimizer = list(model.named_parameters())
no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight']
# 设置模型参数的权重衰减
optimizer_grouped_parameters = [
{'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)],
'weight_decay': 0.01},
{'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
]
optimizer = AdamW(optimizer_grouped_parameters, lr=float(args['learning_rate'])) # 部分参数更新
else:
optimizer = AdamW(model.parameters(), lr=float(args['learning_rate'])) # 部分参数更新
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=100, num_training_steps=num_training_steps) #创建学习率调度器。
scaler = GradScaler() ### 用于混合精度训练
criterion = BCEWithLogitsLoss().cuda() #定义损失函数。
train_dataloader = get_dataloader(args['traincsvpath'], args, tokenizer, train=True)
valid_dataloader = get_dataloader(args['valcsvpath'], args, tokenizer, train=False)
for actual_epoch in trange(args['num_epochs'], desc="Epoch"):
if local_rank == 0: ### 防止每个进程都输出一次
print("begin training of epoch %d / %d" % (actual_epoch + 1, args['num_epochs']))
train_dataloader.sampler.set_epoch(actual_epoch) # 训练时每次的 sampling 顺序不同
train(model, train_dataloader, optimizer, scheduler, criterion, actual_epoch, scaler, args)
if local_rank == 0:
print(f'begin validating')
macro = validate(model, valid_dataloader, criterion, actual_epoch, args) #在验证集上评估模型。
if macro > best_macro:
best_macro = macro
if local_rank == 0: # 防止每个进程都保存一次
save_model(actual_epoch, model, scaler, args['model_save_dir'] + '/best_macro_model_DDP_direct.pt')
dist.destroy_process_group() # 消除进程组,和 init_process_group 相对
- 除上述修改,
main()
函数中使用到的三个函数,get_dataloader()
函数、train()
函数以及validate()
函数,也需要进行相应更新,下面分别对其进行讲解。
1.5 get_dataloader() 函数
该函数主要对 DataLoader()
函数进行了修改。对「训练」和「测试」两个阶段,分别定义 train_sampler 和 test_sampler,其中,设置 train_sampler 为随机采样,test_sampler 为顺序采样。此外,在「训练」阶段,使用 get_ddp_generator()
函数向 DataLoader()
函数传入参数 generator
(作用于不同worker),否则会减弱训练的随机性。
def get_dataloader(path, args, tokenizer, train:bool):
'''
根据给定的路径获取数据,并将数据和训练标志传递给数据加载器,这样可以方便地从给定路径加载数据并生成数据加载器,以供后续的模型训练和评估使用。
path:数据存放路径
tokenizer:分词器
train:是否是训练阶段
'''
texts, labels = load_dataset(path, args['num_labels'])
texts = tokenizer(texts, padding='max_length', truncation=True, return_tensors='pt', max_length=args['max_length'])
data = TensorDataset(texts['input_ids'], texts['attention_mask'], torch.tensor(labels))
if train:
train_sampler = DistributedSampler(data, shuffle=True) # #创建一个随机采样器。
g = get_ddp_generator()
dataloader = DataLoader(dataset=data,
batch_size=args['batch_size'],
num_workers=args['num_workers'],
pin_memory=True,
shuffle=False,
sampler=train_sampler, #采用随机采样器。
generator=g)
else:
test_sampler = DistributedSampler(data, shuffle=False) #创建一个顺序采样器。
dataloader = DataLoader(dataset=data,
batch_size=args['batch_size'],
num_workers=args['num_workers'],
pin_memory=True,
shuffle=False,
sampler=test_sampler #采用顺序采样器。
)
return dataloader
1.6 train() 函数
该函数主要通过 reduce_tensor()
函数对loss进行了取均值操作,并对反向传播的方式进行了修改 —— 通过scaler 对梯度进行缩放,防止由于使用混合精度导致损失下溢,并且对scaler自身的状态进行更新。多个并行进程共用同一个scaler。在模型保存过程中,如果后续需要继续训练(比如预训练-微调模式),最好将scaler 的状态一起保留,并在后续的微调过程中和模型的参数一同加载。
def train(model, train_dataloader, optimizer, scheduler, criterion, actual_epoch, scaler, args):
model.train()
tr_loss = 0
num_train_samples = 0
for step, batch in enumerate(train_dataloader):
batch = tuple(t.cuda(non_blocking=True) for t in batch)
b_input_ids, b_input_mask, b_labels = batch
with torch.cuda.amp.autocast():
output = model(b_input_ids, attention_mask=b_input_mask, labels=b_labels) # 运行到这一行会增加一下显存
loss = criterion(output.logits.view(-1,args['num_labels']), b_labels.type_as(output.logits).view(-1,args['num_labels']))
reduced_loss = reduce_tensor(loss.data) # 对并行进程计算的多个 loss 取平均
if dist.get_rank() == 0: # 防止重复输出
print("\nOutput Loss: ", reduced_loss.item())
tr_loss += reduced_loss.item()
# 并行状态下的更新,不同进程分别根据自己计算的 loss 更新数据
optimizer.zero_grad()
scaler.scale(loss).backward()
scaler.step(optimizer) # 运行到这一行会增加一下显存
# 下面四行,多个进程只执行一次
scheduler.step()
scaler.update()
num_train_samples += b_labels.size(0) #将批次中的样本数量添加到 num_train_samples 中。
torch.cuda.empty_cache() # 释放GPU reserved memory显存
epoch_train_loss = tr_loss / num_train_samples # num_train_samples 代表每个进程承接的样本数量,由于上面已经有对loss取平均的操作,这里分母无需再乘以进程数
if dist.get_rank() == 0:
print("\nTrain loss after Epoch {} : {}".format(actual_epoch, epoch_train_loss))
1.7 validate() 函数
@torch.no_grad()
def validate(model, valid_dataloader, criterion, epoch, args, threshold=0.5):
model.eval()
eval_loss = 0.0
num_eval_samples = 0
pred_labels = []
true_labels = []
for step, batch in enumerate(valid_dataloader):
batch = tuple(t.cuda(non_blocking=True) for t in batch)
b_input_ids, b_input_mask, b_labels = batch
with torch.no_grad():
with torch.cuda.amp.autocast():
output = model(b_input_ids, attention_mask=b_input_mask)
logits = output.logits
loss = criterion(logits.view(-1,args['num_labels']), b_labels.type_as(logits).view(-1,args['num_labels']))
reduced_loss = reduce_tensor(loss.data)
eval_loss += reduced_loss.item()
pred_label = torch.sigmoid(logits)
pred_label = pred_label.to('cpu').numpy()
b_labels = b_labels.to('cpu').numpy()
pred_labels.append(pred_label)
true_labels.append(b_labels)
num_eval_samples += b_labels.shape[0] # 这里是针对单个 进程 的 计算样本数
epoch_eval_loss = eval_loss/num_eval_samples
if dist.get_rank() == 0:
print("Validation loss after Epoch {} : {}".format(epoch, epoch_eval_loss))
# 每个并行进程都会分别执行下列计算操作,得到各进程对应的macro评价指标
pred_labels = [item for sublist in pred_labels for item in sublist]
true_labels = [item for sublist in true_labels for item in sublist]
pred_bools = [pl>threshold for pl in pred_labels]
true_bools = [tl==1 for tl in true_labels]
macro = f1_score(true_bools, pred_bools, average='macro')
# 汇总不同进程的实验结果
macro = reduce_tensor(torch.tensor(macro).cuda())
return macro
1.8 test() 函数
由1.3节可知,我这里的程序是将「训练&验证」与「测试」过程分开,前一阶段保存模型,后一阶段对模型进行验证。所以单独来介绍一下 test()
函数需要修改的内容,这一部分涉及到checkpoint模型加载。加速推理方法详见此篇博客。
@torch.no_grad()
def test(local_rank, args):
init_ddp(local_rank) # 进程初始化
pred_labels = []
true_labels = []
if local_rank == 0:
print(f'begin testing')
save_path = args['model_save_dir'] + '/best_macro_model_DDP_direct.pt'
model, tokenizer = load_model(save_path, args['modelname'], args['num_labels'])
model.cuda()
model = nn.SyncBatchNorm.convert_sync_batchnorm(model) ### 转换模型的 BN 层
num_gpus = torch.cuda.device_count()
if num_gpus > 1 and local_rank == 0:
print('use {} gpus!'.format(num_gpus))
model = nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank) ### 套 DDP
model.eval()
test_dataloader = get_dataloader(args['testcsvpath'], args, tokenizer, train=False)
for idx, batch in enumerate(test_dataloader): #遍历测试集的数据加载器。
......
dist.destroy_process_group() # 消除进程组
到这里,程序就全部修改完毕啦!
2. 程序运行
下面分别介绍 DDP 的几种多卡启动方式。
2.1 mp.spawn() 启动
本程序采用的启动方式是 mp.spawn() 函数,其中mp模块完成对multiprocessing库进行封装,并没有特定针对DDP。
一开始,使用两张 2080 Ti 显卡并行运行程序,然而发现在第 0 个Epoch刚刚启动不久,总是报错 RuntimeError: CUDA out of memory.
,如下:
Traceback (most recent call last):
File "/data/CMLTES_codes/experiment/bloom/BLOOM_DDP.py", line 690, in <module>
mp.spawn(main, args=(args, ), nprocs=world_size)
File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 240, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 198, in start_processes
while not context.join():
File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 160, in join
raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException:
-- Process 1 terminated with the following error:
Traceback (most recent call last):
File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrap
fn(i, *args)
File "/data/CMLTES_codes/experiment/bloom/BLOOM_DDP.py", line 603, in main
epoch_train_loss = train(model, train_dataloader, optimizer, scheduler, loss_func, actual_epoch, scaler, args)
File "/data/CMLTES_codes/experiment/bloom/BLOOM_DDP.py", line 336, in train
scaler.scale(loss).backward() ###
File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/_tensor.py", line 396, in backward
torch.autograd.backward(self, gradient, retain_graph, create_graph, inputs=inputs)
File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/autograd/__init__.py", line 173, in backward
Variable._execution_engine.run_backward( # Calls into the C++ engine to run the backward pass
File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/autograd/function.py", line 253, in apply
return user_fn(self, *args)
File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/transformers/models/bloom/modeling_bloom.py", line 188, in backward
tmp = bloom_gelu_back(grad_output, input)
File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/transformers/models/bloom/modeling_bloom.py", line 175, in bloom_gelu_back
ff = 0.5 * x * ((1 - tanh_out * tanh_out) * (0.79788456 + 0.1070322243 * x * x)) + 0.5 * (1 + tanh_out)
RuntimeError: CUDA out of memory. Tried to allocate 32.00 MiB (GPU 1; 10.76 GiB total capacity; 8.83 GiB already allocated; 28.56 MiB free; 8.94 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation. See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF
百思不得其解后,尝试把程序原封不动放到3090上面运行,发现可以正常运行了!这里的经验教训就是GPU 单卡显存对于并行也很重要!一个1.2G左右的模型微调时大约需要40G左右的中间变量来进行反向传播……这是我属实没有想到的情况……
2.2 tochrnn 启动
相较于使用 mp.spawn() 启动,torchrun 会自动控制一些环境变量的设置,因而更为方便。我们只需要设置os.environ[‘CUDA_VISIBLE_DEVICES’] 即可(不设置默认为该机器上的所有GPU),而无需设置os.environ[‘MASTER_ADDR’] 等。此外,main() 函数不再需要 local_rank 参数。程序入口变为:
if __name__ == '__main__':
......
time_start = time.time()
main(args)
time_elapsed = time.time() - time_start
local_rank = int(os.environ['LOCAL_RANK'])
if local_rank == 0:
print(f'\ntime elapsed: {time_elapsed:.2f} seconds')
运行脚本的命令由python变为了torchrun,如下:
torchrun --standalone --nproc_per_node=2 ddp_main_torchrun.py --gpu 0,1
程序能够成功运行之后,还有一些细节问题,下面一一来进行解决。
2.3 torch.distributed.launch() 启动
这种方式代码量更少,启动速度更快。
python -m torch.distributed.launch --nproc_per_node 8 xxx.py
# -m 意思是 run library module as a script
# -nproc_per_node 表示每台机器的进程数
PS:这种方式要被淘汰了:
/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/distributed/launch.py:178: FutureWarning: The module torch.distributed.launch is deprecated
and will be removed in future. Use torchrun.
3. Debug 历程
下面对使用DDP期间遇到的问题进行分析并给出解决办法。
问题一:多进程计算数据收集
由于我这里是将模型复制到双卡上实现数据并行,所以在汇总结果时,需要将不同进程上的数据进行汇总取均值计算。这时就需要用到 1.2 节提到的 all_reduce()
收集函数。
这里要注意⚠️:对于 float 等非张量型数据,如果我们想对其计算多进程的平均值,可以先使用 torch.tensor() 将需要汇总的变量转为 tensor 并使用 .cuda()
命令将其放至 gpu 上,然后调用 all_reduce()
收集函数。详见 1.7 节 validate()
函数中 macro
变量的收集计算。若没有完成数据转换,则会报错如下:
衍生问题:在进行反向传播时,每个进程使用的训练数据是不同的,所以还是需要根据自己当前计算的 loss 分别更新,而不是根据收集函数得到的 loss 值进行更新,否则会报错,也不合逻辑。
问题二:模型加载参数缺失
在 1.4节 main() 函数中,使用「只保存模型参数」的方式存储模型。在测试阶段,用对应方式加载模型时,报错如下:
(CMLTES) ➜ CMLTES git:(master) ✗ python /data/gluo/CMLTES/codes/BLOOM_DDP.py -mode "test"
Model directory for bloom and batch size 4 already exists!
TEST FOR bloom and Batch Size4
[W socket.cpp:558] [c10d] The client socket has failed to connect to [localhost]:19198 (errno: 99 - Cannot assign requested address).
begin testing
Some weights of BloomForSequenceClassification were not initialized from the model checkpoint at /data/gluo/CMLTES/bloom_PRE and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of BloomForSequenceClassification were not initialized from the model checkpoint at /data/gluo/CMLTES/bloom_PRE and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Traceback (most recent call last):
File "/data/gluo/CMLTES/codes/BLOOM_DDP.py", line 586, in <module>
mp.spawn(test, args=(args, ), nprocs=world_size)
File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 240, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 198, in start_processes
while not context.join():
File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 160, in join
raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException:
-- Process 0 terminated with the following error:
Traceback (most recent call last):
File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrap
fn(i, *args)
File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/autograd/grad_mode.py", line 27, in decorate_context
return func(*args, **kwargs)
File "/data/gluo/CMLTES/codes/BLOOM_DDP.py", line 450, in test
model, tokenizer = load_model(save_path, args['modelname'], args['num_labels']) #加载模型。
File "/data/gluo/CMLTES/codes/BLOOM_DDP.py", line 95, in load_model
model.load_state_dict(model_state_dict) #, strict=False) #加载模型的参数。
File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1604, in load_state_dict
raise RuntimeError('Error(s) in loading state_dict for {}:\n\t{}'.format(
RuntimeError: Error(s) in loading state_dict for BloomForSequenceClassification:
Missing key(s) in state_dict: "transformer.word_embeddings.weight", "transformer.word_embeddings_layernorm.weight", "transformer.word_embeddings_layernorm.bias", "transformer.h.0.input_layernorm.weight", "transformer.h.0.input_layernorm.bias", "transformer.h.0.self_attention.query_key_value.weight", "transformer.h.0.self_attention.query_key_value.bias", "transformer.h.0.self_attention.dense.weight", "transformer.h.0.self_attention.dense.bias",
根据此篇博客,这里暂时的处理方法是:将 load_state_dict()
函数修改为:model.load_state_dict(model_state_dict, strict=False)
,即设置 strict
参数值为 False
. strict=False
的含义是:不严格要求 state_dict
中的键与该模块的键返回的键匹配。
上述处理方式可以暂时忽略上述参数缺失问题,但是可能会对模型的性能造成一定程度的影响,这一问题有待后续解决。
PS:关于模型保存与加载的两种方法
根据此篇博客,保存模型有两种方式,一是全量保存模型的全部信息,二是只保存模型的参数,两种保存方式对应的模型加载方式自然也有所差别。
- 保存模型的全部信息
# 保存模型
checkpoint = {'model': model,\
'scaler': scaler
}
torch.save(checkpoint, save_path)
# 加载模型
checkpoint = torch.load(save_path)
model = checkpoint['model'] # 加载模型
- 只保存模型参数
与第一种方式不同的是,这种方式在加载模型时,需要首先定义与保存的模型相同的模型结构,然后加载模型参数。
# 保存模型
checkpoint = {'state_dict': model.state_dict(),\
'scaler': scaler.state_dict()
}
torch.save(checkpoint, save_path)
# 加载模型
checkpoint = torch.load(save_path, map_location=torch.device('cpu'))
model_state_dict = checkpoint['state_dict']
model.load_state_dict(model_state_dict) #, strict=False) #加载模型参数。
问题三:参数类型转换异常
在 1.4节 main() 函数中,使用「只保存模型参数」的方式存储模型。在测试阶段,用对应方式加载模型时,报错如下:
使用上述方法解决加载模型参数缺失的问题后,随之而来的问题如下所示。
Traceback (most recent call last):
File "/data/gluo/CMLTES/codes/BLOOM_DDP.py", line 587, in <module>
mp.spawn(test, args=(args, ), nprocs=world_size)
File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 240, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 198, in start_processes
while not context.join():
File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 160, in join
raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException:
-- Process 0 terminated with the following error:
Traceback (most recent call last):
File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrap
fn(i, *args)
File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/autograd/grad_mode.py", line 27, in decorate_context
return func(*args, **kwargs)
File "/data/gluo/CMLTES/codes/BLOOM_DDP.py", line 459, in test
model = nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank) ### 套 DDP
File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/parallel/distributed.py", line 646, in __init__
_verify_param_shape_across_processes(self.process_group, parameters)
File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/distributed/utils.py", line 89, in _verify_param_shape_across_processes
return dist._verify_params_across_processes(process_group, tensors, logger)
RuntimeError: value cannot be converted to type int without overflow
深度原因有待后续探索。
问题四:参数泄露
报错日志:UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown warnings.warn('resource_tracker: There appear to be %d '
由上图可知,上述警告产生的原因是使用了 Ctrl+C
中断程序。深度原因有待后续探索。
注意⚠️:在使用PyTorch设置多线程进行数据读取时,后台实际操作情况是开了N个PID连号的子进程模拟多线程工作,所以在程序跑完或者中途kill掉主进程的话,子进程的GPU显存并不会被释放,需要手动一个一个kill才行。
> 本篇博客没有涉及到的知识点:dist.barrier()、Gradient Accumulation、Apex 实现混合精度训练&分布式训练、
后记:本篇博客是我经过不断探索总结而得,其中若有表述不当或表意不明之处,还望各位不吝赐教,我们共同进步!
参考文献
- 一看就懂的DDP代码实践 - 知乎 (zhihu.com)
- Pytorch DistributedDataParallel简明使用指南 - 知乎 (zhihu.com)
- PyTorch DistributedDataParallel 单机多卡训练 踩坑记录
- pytorch保存模型的两种方式_SCU-JJkinging的博客-CSDN博客
- 加载模型出现 RuntimeError: Error(s) in loading state_dict for Model: Missing key(s) in state_dict_sovits 加载浅扩散模型error_大海Git的博客-CSDN博客
- pytorch中model.to(device)和map_location=device的区别_绛洞花主敏明的博客-CSDN博客
- RuntimeError: CUDA out of memory.一些调bug路程 - 知乎 (zhihu.com)
- pytorch 分布式计算 你们都遇到过哪些 坑/bug? - 知乎 (zhihu.com)
- 关于pytorch中的distributedsampler函数使用_DRACO于的博客-CSDN博客
- 通过设置PYTORCH_CUDA_ALLOC_CONF中的max_split_size_mb解决Pytorch的显存碎片化导致的CUDA:Out Of Memory问题_梦音Yune的博客-CSDN博客
- torch.cuda.amp.autocast()使用示例_生成滞涨网络~的博客-CSDN博客
- 可能99%人犯的PyTorch错误 set_seed 会破坏随机性,官方 worker_init_fn 无法解决 - 知乎 (zhihu.com)
- 原创 深度 PyTorch DDP系列第三篇:实战与技巧 - 知乎 (zhihu.com)
- torch.distributed_Wanderer001的博客-CSDN博客
以下系列资源均来自此博主,可以说是关于数据并行十分详细的教程了!
- Pytorch(十一) —— 分布式(多GPU/多卡)训练 并行 (DP & DDP)_pytorch gpu 分布式_hxxjxw的博客-CSDN博客
- PyTorch多卡/多GPU/分布式DPP的基本概念(node&rank&local_rank&nnodes&node_rank&nproc_per_node&world_size)_hxxjxw的博客-CSDN博客
- torch.distributed多卡/多GPU/分布式DPP(一) —— torch.distributed.launch & all_gather & init_process_group_hxxjxw的博客-CSDN博客
- torch.distributed多卡/多GPU/分布式DPP(二)—torch.distributed.all_reduce(reduce_mean)barrier控制进程执行顺序&seed随机种子_torch dpp_hxxjxw的博客-CSDN博客
- Pytorch分布式训练/多卡训练DDP——模型初始化(torch.distribute 与 DDP的区别)_pytorch distribute torchtrun-CSDN博客
- 多卡训练中的BN(BatchNorm)_多卡 batchnorm_hxxjxw的博客-CSDN博客
- 为什么Pytorch多卡训练容易导致GPU显存不释放_hxxjxw的博客-CSDN博客
- Pytorch分布式训练/多卡训练(一) —— Data Parallel并行(DP)_model = nn.dataparallel(model, device_ids=[0, 1])_hxxjxw的博客-CSDN博客
- Pytorch分布式训练/多卡训练(二) —— Data Parallel并行(DDP)(2.1)(基本概念&代码框架)_slurm_procid_hxxjxw的博客-CSDN博客
- Pytorch分布式训练/多卡训练(二) —— Data Parallel并行(DDP)(2.2)(代码示例)(BN同步&主卡保存&梯度累加&多卡测试inference&随机种子seed)_ddp程序中的seed_hxxjxw的博客-CSDN博客
- Pytorch分布式训练/多卡训练(二) —— Data Parallel并行(DDP)(2.3)(torch.multiprocessing(spawn) & Apex)_torch.multiprocessing.spawn_hxxjxw的博客-CSDN博客
- Pytorch分布式训练/多卡训练(三) —— Model Parallel 模型并行_hxxjxw的博客-CSDN博客