「大模型微调」使用 DDP 实现程序单机多卡并行指南

news2025/4/17 2:52:01

最近在大趋势的影响下,开始染指大模型。由于实验室计算资源的限制,需要使用单机多卡并行的方式运行程序,这里以 BLOOM-560m 模型为例,演示如何通过单机多卡DDP并行的方式微调完成下游任务。

目录

  • 0. 基础篇
    • - 两种分布式训练方式
    • - 数据并行 & 模型并行
  • 1. 程序修改
    • 1.1 导入关键包
    • 1.2 定义关键函数
    • 1.3 程序入口
    • 1.4 main() 函数
    • 1.5 get_dataloader() 函数
    • 1.6 train() 函数
    • 1.7 validate() 函数
    • 1.8 test() 函数
  • 2. 程序运行
    • 2.1 mp.spawn() 启动
    • 2.2 tochrnn 启动
    • 2.3 torch.distributed.launch() 启动
  • 3. Debug 历程
    • 问题一:多进程计算数据收集
    • 问题二:模型加载参数缺失
    • 问题三:参数类型转换异常
    • 问题四:参数泄露


0. 基础篇

- 两种分布式训练方式

⚠️:Pytorch 分布式目前只支持 Linux。实现程序并行主要有 DataParallel 和 DistributedDataParallel 两种方式:

  • DataParallel (DP):实现简单,代码量较少,启动速度快一点。但速度较慢,且存在负载不均衡的问题。单进程,多线程。主卡显存占用比其他卡会多很多。不支持 Apex 的混合精度训练。是Pytorch官方很久之前给的一种方案。受 Python GIL 的限制,DP的操作原理是将一个batchsize的输入数据均分到多个GPU上分别计算(此处注意,batchsize要大于GPU个数才能划分)。

  • DistributedDataParallel (DDP):All-Reduce模式,本意是用来分布式训练(多机多卡),但是也可用于单机多卡。配置稍复杂。多进程。数据分配较均衡。是新一代的多卡训练方法。使用 torch.distributed 库实现并行。torch.distributed 库提供分布式支持,包括 GPU 和 CPU 的分布式训练支持,该库提供了一种类似 MPI 的接口,用于跨多机器网络交换张量数据。它支持几种不同的后端和初始化方法。DDP通过Ring-Reduce的数据交换方法提高了通讯效率,并通过启动多个进程的方式减轻Python GIL的限制,从而提高训练速度。

  • DDP多卡训练的原理

    1. 将模型在各个GPU上复制一份;
    2. 将总的 batch 数据等分到不同的GPU上进行计算(shuffle 顺序打乱),每个进程都从磁盘加载其自己的数据;
    3. 在模型训练时,损失函数的前向传播和计算在每个 GPU 上独立执行,因此,不需要收集网络输出。在反向传播期间,各个进程通过一种叫 Ring-Reduce 的方法与其他进程通讯,交换各自的梯度,从而获得所有进程的平均梯度;然后用这个值在所有 GPU 上执行梯度下降,从而每个 GPU 在反向传播结束时最终得到平均梯度的相同副本;
    4. 各个进程用平均后的梯度更新自己的参数,因为各个进程的初始参数、更新梯度是一致的,所以更新后的参数也是完全相同的。

- 数据并行 & 模型并行

  • 数据并行是指,多张 GPUs 使用相同的模型副本,但采用同一batch中的不同数据进行训练。
  • 模型并行是指,多张 GPUs 使用同一 batch 的数据,分别训练模型的不同部分。

简单来记就是:并行就是对并行对象进行拆分,以提高运算效率。


1. 程序修改

本教程使用DDP 方式完成程序并行。参考此篇教程,实现模型多卡复制和数据并行。

1.1 导入关键包

以下是程序修改过程中会使用到的包;其中,dist 负责多卡通讯,DDP 负责模型传递等工作。

import torch.distributed as dist
import torch.multiprocessing as mp
from torch.cuda.amp import GradScaler
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP

1.2 定义关键函数

  • init_ddp(local_rank)

对进程进行初始化,使用 nccl 后端,并用 env 作为初始化方法。

local_rank = dist.get_rank()
world_size = dist.get_world_size()

def init_ddp(local_rank):
    # 有了这一句之后,在转换device的时候直接使用 a=a.cuda()即可,否则要用a=a.cuda(local_rank)
    torch.cuda.set_device(local_rank)
    os.environ['RANK'] = str(local_rank)
    dist.init_process_group(backend='nccl', init_method='env://')

在完成了该初始化后,可以很轻松地在需要时获得 local_rankworld_size,而不需要作为额外参数从 main() 函数中一层一层往下传。比如需要 print, log, save_model时,由于多个进程拥有相同的副本,故只需要一个进程执行即可,示例:

if local_rank == 0:
    print(f'begin validating')  

......

if local_rank == 0:
    save_model(actual_epoch, model, scaler, args['model_save_dir'] + '/best_macro_model_DDP_direct.pt')
  • reduce_tensor(tensor)

对多个进程的计算结果进行汇总,如 loss、评价指标。

def reduce_tensor(tensor: torch.Tensor):
    '''
    对多个进程计算的多个 tensor 类型的 输出值取平均操作
    '''
    rt = tensor.clone()  # tensor(9.1429, device='cuda:1')
    dist.all_reduce(rt, op=dist.reduce_op.SUM)
    rt /= dist.get_world_size()
    return rt
  • get_ddp_generator(seed)

用于训练过程中,增强训练的随机性。

def get_ddp_generator(seed=3407):
    '''
    对每个进程使用不同的随机种子,增强训练的随机性
    '''
    local_rank = dist.get_rank()
    g = torch.Generator()
    g.manual_seed(seed + local_rank)
    return g

1.3 程序入口

if __name__ == '__main__':中,使用 spawn() 函数启动 DDP,该函数的主要参数包括:

  1. fn:需要并行的函数。这里即为 main() 函数,每个线程将执行一次该函数;
  2. args:fn所需的参数。注意:传给fn的参数必须写成元组的形式,哪怕只有一个参数;
  3. nprocs:启动的进程数,默认值为1. 这里将其设置为world_size即可。nprocs的值与world_size不一致会导致进程等待同步而一直停滞。
if __name__ == '__main__':

    parser = argparse.ArgumentParser()
    parser.add_argument('-args', help="priority", type=bool, required=False, default=True)
    parser.add_argument('-gpu', default='0,1', type=str, help='gpu device ids for CUDA_VISIBLE_DEVICES')
    parser.add_argument('-mode', help="train&test", type=str, required=False, default='train')
    parser.add_argument('-requires_grad', help="whether to weight_decay", type= bool, required=False, default=True)
    args = parser.parse_args()
    
    os.environ['MASTER_ADDR'] = 'localhost'  # 0号机器的IP
    os.environ['MASTER_PORT'] = '19198'  # 0号机器的可用端口
    os.environ['CUDA_VISIBLE_DEVICES'] = args['gpu']  # 使用哪些GPU
    world_size = torch.cuda.device_count()
    os.environ['WORLD_SIZE'] = str(world_size)
    os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:128"
    os.environ["TOKENIZERS_PARALLELISM"] = "false"  # 指定程序在分词时不并行执行
    
    if args['mode'] == 'train':
        time_start = time.time()
        mp.spawn(fn=main, args=(args, ), nprocs=world_size)
        time_elapsed = time.time() - time_start
        print(f'\ntime elapsed: {time_elapsed:.2f} seconds.')

    elif args['mode'] == 'test':  
        time_start = time.time()
        mp.spawn(fn=test, args=(args, ), nprocs=world_size)
        time_elapsed = time.time() - time_start
        print(f'\ntime elapsed: {time_elapsed:.2f} seconds.')

1.4 main() 函数

这里的 main() 函数即上面提到的 spawn() 函数中传入的第一个参数。代码关键部位修改如下:

  • 参数列表更新:添加额外参数 local_rank,该参数无需在 mp.spawn() 函数中传递,系统会自动分配;

  • 进程初始化:调用 init_ddp() 函数实现;

  • BN层同步:调用 convert_sync_batchnorm() 函数用同步的方法完成BN以尽可能模拟单卡场景,尽管会降低GPU利用率,但可以提高模型在多卡场景下的表现(详解见此篇博客);BN层同步的必要性依赖于单卡batch_size的大小,如果单卡batch_size太小,使用SyncBN可以提高性能。但是如果batch_size较大的时候就不需要使用SyncBN, 因为这需要多卡之间通信,会导致训练速度变慢。

  • 数据并行:调用 DistributedDataParallel() 函数实现;

  • 指定混合精度训练:调用 GradScaler() 函数实现,作为参数传至 train() 函数中;

  • 训练采样器设置:每个 epoch 设置不同的 sampling 顺序;

  • 避免副本重复执行:使用 if local_rank==0: 语句进行约束;

  • 消除进程组:调用 destroy_process_group() 函数实现。

def main(local_rank, args):  # 参数列表更新
    init_ddp(local_rank)  ### 进程初始化

    best_macro = 0

    model, tokenizer = initialise_model(args['modelname'], args['num_labels'])
    model.cuda()
    model = nn.SyncBatchNorm.convert_sync_batchnorm(model)  # BN层同步
    
    num_gpus = torch.cuda.device_count()
    if num_gpus > 1:
        print('use {} gpus!'.format(num_gpus))
        model = nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)  ### 套 DDP
    
    num_training_steps = args['num_epochs'] * (args['num_samples'] // args['batch_size']) #总的训练步数
    
    if args['requires_grad']:  # 权重衰减
        param_optimizer = list(model.named_parameters())
        no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight']
        # 设置模型参数的权重衰减
        optimizer_grouped_parameters = [
            {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)],
             'weight_decay': 0.01},
            {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}
        ]
        optimizer = AdamW(optimizer_grouped_parameters, lr=float(args['learning_rate'])) # 部分参数更新
    else:
        optimizer = AdamW(model.parameters(), lr=float(args['learning_rate'])) # 部分参数更新
    
    scheduler = get_linear_schedule_with_warmup(optimizer,  num_warmup_steps=100, num_training_steps=num_training_steps) #创建学习率调度器。
            
    scaler = GradScaler()  ###  用于混合精度训练
    criterion = BCEWithLogitsLoss().cuda() #定义损失函数。

    train_dataloader = get_dataloader(args['traincsvpath'], args, tokenizer, train=True)
    valid_dataloader = get_dataloader(args['valcsvpath'], args, tokenizer, train=False)

    for actual_epoch in trange(args['num_epochs'], desc="Epoch"):
        if local_rank == 0:  ### 防止每个进程都输出一次
            print("begin training of epoch %d / %d" % (actual_epoch + 1, args['num_epochs']))
        
        train_dataloader.sampler.set_epoch(actual_epoch)  # 训练时每次的 sampling 顺序不同
        train(model, train_dataloader, optimizer, scheduler, criterion, actual_epoch, scaler, args)   
        
        if local_rank == 0:
            print(f'begin validating')  
        macro = validate(model, valid_dataloader, criterion, actual_epoch, args) #在验证集上评估模型。
     
        if macro > best_macro:
            best_macro = macro
            if local_rank == 0:  # 防止每个进程都保存一次
                save_model(actual_epoch, model, scaler, args['model_save_dir'] + '/best_macro_model_DDP_direct.pt')
                    
    dist.destroy_process_group()  # 消除进程组,和 init_process_group 相对
  • 除上述修改,main() 函数中使用到的三个函数,get_dataloader() 函数、train() 函数以及 validate() 函数,也需要进行相应更新,下面分别对其进行讲解。

1.5 get_dataloader() 函数

该函数主要对 DataLoader() 函数进行了修改。对「训练」和「测试」两个阶段,分别定义 train_sampler 和 test_sampler,其中,设置 train_sampler 为随机采样,test_sampler 为顺序采样。此外,在「训练」阶段,使用 get_ddp_generator() 函数向 DataLoader() 函数传入参数 generator(作用于不同worker),否则会减弱训练的随机性。

def get_dataloader(path, args, tokenizer, train:bool): 
    '''
    根据给定的路径获取数据,并将数据和训练标志传递给数据加载器,这样可以方便地从给定路径加载数据并生成数据加载器,以供后续的模型训练和评估使用。
    path:数据存放路径
    tokenizer:分词器
    train:是否是训练阶段
    '''
    texts, labels = load_dataset(path, args['num_labels'])
    texts = tokenizer(texts, padding='max_length', truncation=True, return_tensors='pt', max_length=args['max_length']) 
    data = TensorDataset(texts['input_ids'], texts['attention_mask'], torch.tensor(labels)) 
    
    if train:
        train_sampler = DistributedSampler(data, shuffle=True)  # #创建一个随机采样器。
        g = get_ddp_generator()
        dataloader = DataLoader(dataset=data,
                                batch_size=args['batch_size'],
                                num_workers=args['num_workers'],
                                pin_memory=True,
                                shuffle=False,
                                sampler=train_sampler, #采用随机采样器。
                                generator=g) 
        
    else:
        test_sampler = DistributedSampler(data, shuffle=False) #创建一个顺序采样器。
        dataloader = DataLoader(dataset=data,
                                batch_size=args['batch_size'],
                                num_workers=args['num_workers'],
                                pin_memory=True,
                                shuffle=False,
                                sampler=test_sampler #采用顺序采样器。
                                )
    return dataloader

1.6 train() 函数

该函数主要通过 reduce_tensor() 函数对loss进行了取均值操作,并对反向传播的方式进行了修改 —— 通过scaler 对梯度进行缩放,防止由于使用混合精度导致损失下溢,并且对scaler自身的状态进行更新。多个并行进程共用同一个scaler。在模型保存过程中,如果后续需要继续训练(比如预训练-微调模式),最好将scaler 的状态一起保留,并在后续的微调过程中和模型的参数一同加载。

def train(model, train_dataloader, optimizer, scheduler, criterion, actual_epoch, scaler, args):

    model.train()
    
    tr_loss = 0
    num_train_samples = 0
    
    for step, batch in enumerate(train_dataloader):
        
        batch = tuple(t.cuda(non_blocking=True) for t in batch)
        b_input_ids, b_input_mask, b_labels = batch
        with torch.cuda.amp.autocast():
	        output = model(b_input_ids, attention_mask=b_input_mask, labels=b_labels) # 运行到这一行会增加一下显存
	        loss = criterion(output.logits.view(-1,args['num_labels']), b_labels.type_as(output.logits).view(-1,args['num_labels']))
        
        reduced_loss = reduce_tensor(loss.data)  # 对并行进程计算的多个 loss 取平均
        if dist.get_rank() == 0:  # 防止重复输出
            print("\nOutput Loss: ", reduced_loss.item())
        tr_loss += reduced_loss.item()
        
        # 并行状态下的更新,不同进程分别根据自己计算的 loss 更新数据
        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)  #  运行到这一行会增加一下显存
        # 下面四行,多个进程只执行一次
        scheduler.step()
        scaler.update()
        num_train_samples += b_labels.size(0) #将批次中的样本数量添加到 num_train_samples 中。
        torch.cuda.empty_cache()  # 释放GPU reserved memory显存
    
    epoch_train_loss = tr_loss / num_train_samples  # num_train_samples 代表每个进程承接的样本数量,由于上面已经有对loss取平均的操作,这里分母无需再乘以进程数

    if dist.get_rank() == 0:
        print("\nTrain loss after Epoch {} : {}".format(actual_epoch, epoch_train_loss))

1.7 validate() 函数

@torch.no_grad()
def validate(model, valid_dataloader, criterion, epoch, args, threshold=0.5):

    model.eval()

    eval_loss = 0.0
    num_eval_samples = 0
    
    pred_labels = []
    true_labels = []
    
    for step, batch in enumerate(valid_dataloader):
        
        batch = tuple(t.cuda(non_blocking=True) for t in batch)
        b_input_ids, b_input_mask, b_labels = batch
        
        with torch.no_grad():
            with torch.cuda.amp.autocast():
	            output = model(b_input_ids, attention_mask=b_input_mask)
	            logits = output.logits
                        
            loss = criterion(logits.view(-1,args['num_labels']), b_labels.type_as(logits).view(-1,args['num_labels']))
            
            reduced_loss = reduce_tensor(loss.data)
            eval_loss += reduced_loss.item()
            
            pred_label = torch.sigmoid(logits)
            pred_label = pred_label.to('cpu').numpy()
            b_labels = b_labels.to('cpu').numpy()
            
        pred_labels.append(pred_label)
        true_labels.append(b_labels)

        num_eval_samples += b_labels.shape[0]  # 这里是针对单个 进程 的 计算样本数
    
    epoch_eval_loss = eval_loss/num_eval_samples  
    
    if dist.get_rank() == 0:
        print("Validation loss after Epoch {} : {}".format(epoch, epoch_eval_loss))

    # 每个并行进程都会分别执行下列计算操作,得到各进程对应的macro评价指标
    pred_labels = [item for sublist in pred_labels for item in sublist]
    true_labels = [item for sublist in true_labels for item in sublist]
    pred_bools = [pl>threshold for pl in pred_labels]
    true_bools = [tl==1 for tl in true_labels]
    macro = f1_score(true_bools, pred_bools, average='macro')
    
    # 汇总不同进程的实验结果
    macro = reduce_tensor(torch.tensor(macro).cuda())
    
    return macro

1.8 test() 函数

由1.3节可知,我这里的程序是将「训练&验证」与「测试」过程分开,前一阶段保存模型,后一阶段对模型进行验证。所以单独来介绍一下 test() 函数需要修改的内容,这一部分涉及到checkpoint模型加载。加速推理方法详见此篇博客。


@torch.no_grad()
def test(local_rank, args):

    init_ddp(local_rank)  # 进程初始化

    pred_labels = []
    true_labels = []

    if local_rank == 0:
        print(f'begin testing')    

    save_path = args['model_save_dir'] + '/best_macro_model_DDP_direct.pt'
    model, tokenizer = load_model(save_path, args['modelname'], args['num_labels'])
    model.cuda()
    model = nn.SyncBatchNorm.convert_sync_batchnorm(model)  ### 转换模型的 BN 层
    
    num_gpus = torch.cuda.device_count()
    if num_gpus > 1 and local_rank == 0:
        print('use {} gpus!'.format(num_gpus))
        model = nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)  ### 套 DDP

    model.eval()

    test_dataloader = get_dataloader(args['testcsvpath'], args, tokenizer, train=False)
    
    for idx, batch in enumerate(test_dataloader): #遍历测试集的数据加载器。

	......    

    dist.destroy_process_group()  # 消除进程组

到这里,程序就全部修改完毕啦!


2. 程序运行

下面分别介绍 DDP 的几种多卡启动方式。

2.1 mp.spawn() 启动

本程序采用的启动方式是 mp.spawn() 函数,其中mp模块完成对multiprocessing库进行封装,并没有特定针对DDP。

一开始,使用两张 2080 Ti 显卡并行运行程序,然而发现在第 0 个Epoch刚刚启动不久,总是报错 RuntimeError: CUDA out of memory.,如下:

Traceback (most recent call last):
  File "/data/CMLTES_codes/experiment/bloom/BLOOM_DDP.py", line 690, in <module>
    mp.spawn(main, args=(args, ), nprocs=world_size)
  File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 240, in spawn
    return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 198, in start_processes
    while not context.join():
  File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 160, in join
    raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException: 

-- Process 1 terminated with the following error:
Traceback (most recent call last):
  File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrap
    fn(i, *args)
  File "/data/CMLTES_codes/experiment/bloom/BLOOM_DDP.py", line 603, in main
    epoch_train_loss = train(model, train_dataloader, optimizer, scheduler, loss_func, actual_epoch, scaler, args)
  File "/data/CMLTES_codes/experiment/bloom/BLOOM_DDP.py", line 336, in train
    scaler.scale(loss).backward()  ###
  File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/_tensor.py", line 396, in backward
    torch.autograd.backward(self, gradient, retain_graph, create_graph, inputs=inputs)
  File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/autograd/__init__.py", line 173, in backward
    Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass
  File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/torch/autograd/function.py", line 253, in apply
    return user_fn(self, *args)
  File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/transformers/models/bloom/modeling_bloom.py", line 188, in backward
    tmp = bloom_gelu_back(grad_output, input)
  File "/root/anaconda3/envs/pytorch77/lib/python3.9/site-packages/transformers/models/bloom/modeling_bloom.py", line 175, in bloom_gelu_back
    ff = 0.5 * x * ((1 - tanh_out * tanh_out) * (0.79788456 + 0.1070322243 * x * x)) + 0.5 * (1 + tanh_out)
RuntimeError: CUDA out of memory. Tried to allocate 32.00 MiB (GPU 1; 10.76 GiB total capacity; 8.83 GiB already allocated; 28.56 MiB free; 8.94 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

百思不得其解后,尝试把程序原封不动放到3090上面运行,发现可以正常运行了!这里的经验教训就是GPU 单卡显存对于并行也很重要!一个1.2G左右的模型微调时大约需要40G左右的中间变量来进行反向传播……这是我属实没有想到的情况……


2.2 tochrnn 启动

相较于使用 mp.spawn() 启动,torchrun 会自动控制一些环境变量的设置,因而更为方便。我们只需要设置os.environ[‘CUDA_VISIBLE_DEVICES’] 即可(不设置默认为该机器上的所有GPU),而无需设置os.environ[‘MASTER_ADDR’] 等。此外,main() 函数不再需要 local_rank 参数。程序入口变为:

if __name__ == '__main__':
    
    ......
    
    time_start = time.time()
    main(args)
    time_elapsed = time.time() - time_start
    local_rank = int(os.environ['LOCAL_RANK'])
    if local_rank == 0:
        print(f'\ntime elapsed: {time_elapsed:.2f} seconds')

运行脚本的命令由python变为了torchrun,如下:

torchrun --standalone --nproc_per_node=2 ddp_main_torchrun.py --gpu 0,1

程序能够成功运行之后,还有一些细节问题,下面一一来进行解决。

2.3 torch.distributed.launch() 启动

这种方式代码量更少,启动速度更快。

python -m torch.distributed.launch --nproc_per_node 8 xxx.py
# -m 意思是 run library module as a script
# -nproc_per_node 表示每台机器的进程数

PS:这种方式要被淘汰了:

/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/distributed/launch.py:178: FutureWarning: The module torch.distributed.launch is deprecated
and will be removed in future. Use torchrun.

3. Debug 历程

下面对使用DDP期间遇到的问题进行分析并给出解决办法。

问题一:多进程计算数据收集

由于我这里是将模型复制到双卡上实现数据并行,所以在汇总结果时,需要将不同进程上的数据进行汇总取均值计算。这时就需要用到 1.2 节提到的 all_reduce() 收集函数。

这里要注意⚠️:对于 float 等非张量型数据,如果我们想对其计算多进程的平均值,可以先使用 torch.tensor() 将需要汇总的变量转为 tensor 并使用 .cuda() 命令将其放至 gpu 上,然后调用 all_reduce() 收集函数。详见 1.7 节 validate() 函数中 macro 变量的收集计算。若没有完成数据转换,则会报错如下:

衍生问题:在进行反向传播时,每个进程使用的训练数据是不同的,所以还是需要根据自己当前计算的 loss 分别更新,而不是根据收集函数得到的 loss 值进行更新,否则会报错,也不合逻辑。


问题二:模型加载参数缺失

在 1.4节 main() 函数中,使用「只保存模型参数」的方式存储模型。在测试阶段,用对应方式加载模型时,报错如下:

(CMLTES) ➜  CMLTES git:(master) ✗ python /data/gluo/CMLTES/codes/BLOOM_DDP.py -mode "test"
Model directory for bloom and batch size 4 already exists!
TEST FOR bloom and Batch Size4
[W socket.cpp:558] [c10d] The client socket has failed to connect to [localhost]:19198 (errno: 99 - Cannot assign requested address).
begin testing
Some weights of BloomForSequenceClassification were not initialized from the model checkpoint at /data/gluo/CMLTES/bloom_PRE and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of BloomForSequenceClassification were not initialized from the model checkpoint at /data/gluo/CMLTES/bloom_PRE and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Traceback (most recent call last):
  File "/data/gluo/CMLTES/codes/BLOOM_DDP.py", line 586, in <module>
    mp.spawn(test, args=(args, ), nprocs=world_size)
  File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 240, in spawn
    return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 198, in start_processes
    while not context.join():
  File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 160, in join
    raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException: 

-- Process 0 terminated with the following error:
Traceback (most recent call last):
  File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrap
    fn(i, *args)
  File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/autograd/grad_mode.py", line 27, in decorate_context
    return func(*args, **kwargs)
  File "/data/gluo/CMLTES/codes/BLOOM_DDP.py", line 450, in test
    model, tokenizer = load_model(save_path, args['modelname'], args['num_labels']) #加载模型。
  File "/data/gluo/CMLTES/codes/BLOOM_DDP.py", line 95, in load_model
    model.load_state_dict(model_state_dict) #, strict=False) #加载模型的参数。
  File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1604, in load_state_dict
    raise RuntimeError('Error(s) in loading state_dict for {}:\n\t{}'.format(
RuntimeError: Error(s) in loading state_dict for BloomForSequenceClassification:
        Missing key(s) in state_dict: "transformer.word_embeddings.weight", "transformer.word_embeddings_layernorm.weight", "transformer.word_embeddings_layernorm.bias", "transformer.h.0.input_layernorm.weight", "transformer.h.0.input_layernorm.bias", "transformer.h.0.self_attention.query_key_value.weight", "transformer.h.0.self_attention.query_key_value.bias", "transformer.h.0.self_attention.dense.weight", "transformer.h.0.self_attention.dense.bias",

根据此篇博客,这里暂时的处理方法是:将 load_state_dict() 函数修改为:model.load_state_dict(model_state_dict, strict=False),即设置 strict 参数值为 False. strict=False 的含义是:不严格要求 state_dict 中的键与该模块的键返回的键匹配。

上述处理方式可以暂时忽略上述参数缺失问题,但是可能会对模型的性能造成一定程度的影响,这一问题有待后续解决。

PS:关于模型保存与加载的两种方法

根据此篇博客,保存模型有两种方式,一是全量保存模型的全部信息,二是只保存模型的参数,两种保存方式对应的模型加载方式自然也有所差别。

  • 保存模型的全部信息
# 保存模型
checkpoint = {'model': model,\
              'scaler': scaler
                }
torch.save(checkpoint, save_path)

# 加载模型
checkpoint = torch.load(save_path)
model = checkpoint['model']  # 加载模型
  • 只保存模型参数
    与第一种方式不同的是,这种方式在加载模型时,需要首先定义与保存的模型相同的模型结构,然后加载模型参数。
# 保存模型
checkpoint = {'state_dict': model.state_dict(),\
              'scaler': scaler.state_dict()
                }
torch.save(checkpoint, save_path)

# 加载模型
checkpoint = torch.load(save_path, map_location=torch.device('cpu'))
model_state_dict = checkpoint['state_dict']
model.load_state_dict(model_state_dict) #, strict=False) #加载模型参数。

问题三:参数类型转换异常

在 1.4节 main() 函数中,使用「只保存模型参数」的方式存储模型。在测试阶段,用对应方式加载模型时,报错如下:

使用上述方法解决加载模型参数缺失的问题后,随之而来的问题如下所示。

Traceback (most recent call last):
  File "/data/gluo/CMLTES/codes/BLOOM_DDP.py", line 587, in <module>
    mp.spawn(test, args=(args, ), nprocs=world_size)
  File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 240, in spawn
    return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 198, in start_processes
    while not context.join():
  File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 160, in join
    raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException: 

-- Process 0 terminated with the following error:
Traceback (most recent call last):
  File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrap
    fn(i, *args)
  File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/autograd/grad_mode.py", line 27, in decorate_context
    return func(*args, **kwargs)
  File "/data/gluo/CMLTES/codes/BLOOM_DDP.py", line 459, in test
    model = nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)  ### 套 DDP
  File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/nn/parallel/distributed.py", line 646, in __init__
    _verify_param_shape_across_processes(self.process_group, parameters)
  File "/opt/conda/envs/CMLTES/lib/python3.9/site-packages/torch/distributed/utils.py", line 89, in _verify_param_shape_across_processes
    return dist._verify_params_across_processes(process_group, tensors, logger)
RuntimeError: value cannot be converted to type int without overflow

深度原因有待后续探索。


问题四:参数泄露

报错日志:UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown warnings.warn('resource_tracker: There appear to be %d '

在这里插入图片描述
由上图可知,上述警告产生的原因是使用了 Ctrl+C 中断程序。深度原因有待后续探索。

注意⚠️:在使用PyTorch设置多线程进行数据读取时,后台实际操作情况是开了N个PID连号的子进程模拟多线程工作,所以在程序跑完或者中途kill掉主进程的话,子进程的GPU显存并不会被释放,需要手动一个一个kill才行。

> 本篇博客没有涉及到的知识点:dist.barrier()、Gradient Accumulation、Apex 实现混合精度训练&分布式训练、

后记:本篇博客是我经过不断探索总结而得,其中若有表述不当或表意不明之处,还望各位不吝赐教,我们共同进步!


参考文献

  1. 一看就懂的DDP代码实践 - 知乎 (zhihu.com)
  2. Pytorch DistributedDataParallel简明使用指南 - 知乎 (zhihu.com)
  3. PyTorch DistributedDataParallel 单机多卡训练 踩坑记录
  4. pytorch保存模型的两种方式_SCU-JJkinging的博客-CSDN博客
  5. 加载模型出现 RuntimeError: Error(s) in loading state_dict for Model: Missing key(s) in state_dict_sovits 加载浅扩散模型error_大海Git的博客-CSDN博客
  6. pytorch中model.to(device)和map_location=device的区别_绛洞花主敏明的博客-CSDN博客
  7. RuntimeError: CUDA out of memory.一些调bug路程 - 知乎 (zhihu.com)
  8. pytorch 分布式计算 你们都遇到过哪些 坑/bug? - 知乎 (zhihu.com)
  9. 关于pytorch中的distributedsampler函数使用_DRACO于的博客-CSDN博客
  10. 通过设置PYTORCH_CUDA_ALLOC_CONF中的max_split_size_mb解决Pytorch的显存碎片化导致的CUDA:Out Of Memory问题_梦音Yune的博客-CSDN博客
  11. torch.cuda.amp.autocast()使用示例_生成滞涨网络~的博客-CSDN博客
  12. 可能99%人犯的PyTorch错误 set_seed 会破坏随机性,官方 worker_init_fn 无法解决 - 知乎 (zhihu.com)
  13. 原创 深度 PyTorch DDP系列第三篇:实战与技巧 - 知乎 (zhihu.com)
  14. torch.distributed_Wanderer001的博客-CSDN博客

以下系列资源均来自此博主,可以说是关于数据并行十分详细的教程了!

  1. Pytorch(十一) —— 分布式(多GPU/多卡)训练 并行 (DP & DDP)_pytorch gpu 分布式_hxxjxw的博客-CSDN博客
  2. PyTorch多卡/多GPU/分布式DPP的基本概念(node&rank&local_rank&nnodes&node_rank&nproc_per_node&world_size)_hxxjxw的博客-CSDN博客
  3. torch.distributed多卡/多GPU/分布式DPP(一) —— torch.distributed.launch & all_gather & init_process_group_hxxjxw的博客-CSDN博客
  4. torch.distributed多卡/多GPU/分布式DPP(二)—torch.distributed.all_reduce(reduce_mean)barrier控制进程执行顺序&seed随机种子_torch dpp_hxxjxw的博客-CSDN博客
  5. Pytorch分布式训练/多卡训练DDP——模型初始化(torch.distribute 与 DDP的区别)_pytorch distribute torchtrun-CSDN博客
  6. 多卡训练中的BN(BatchNorm)_多卡 batchnorm_hxxjxw的博客-CSDN博客
  7. 为什么Pytorch多卡训练容易导致GPU显存不释放_hxxjxw的博客-CSDN博客
  8. Pytorch分布式训练/多卡训练(一) —— Data Parallel并行(DP)_model = nn.dataparallel(model, device_ids=[0, 1])_hxxjxw的博客-CSDN博客
  9. Pytorch分布式训练/多卡训练(二) —— Data Parallel并行(DDP)(2.1)(基本概念&代码框架)_slurm_procid_hxxjxw的博客-CSDN博客
  10. Pytorch分布式训练/多卡训练(二) —— Data Parallel并行(DDP)(2.2)(代码示例)(BN同步&主卡保存&梯度累加&多卡测试inference&随机种子seed)_ddp程序中的seed_hxxjxw的博客-CSDN博客
  11. Pytorch分布式训练/多卡训练(二) —— Data Parallel并行(DDP)(2.3)(torch.multiprocessing(spawn) & Apex)_torch.multiprocessing.spawn_hxxjxw的博客-CSDN博客
  12. Pytorch分布式训练/多卡训练(三) —— Model Parallel 模型并行_hxxjxw的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/619545.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络安全自学笔记

一、怎么入门&#xff1f; 这个 Web 安全学习路线&#xff0c;整体大概半年左右&#xff0c;具体视每个人的情况而定。 &#xff08;上传一直很模糊&#xff0c;所以就没有展开了&#xff0c;需要高清版的可以在下面领取&#xff09; &#x1f449; 【一学习路线高清版一】&a…

Java操作mongodb(含分页,精确查询,模糊查询,时间区间,排序)进行查询

mongodb是常用的非关系型数据库&#xff0c;他经常用来存储文本数据&#xff0c;也就是JSON格式的数据。 不废话&#xff0c;直接上代码。注释写的很详细。&#xff08;有问题留言秒回&#xff09; public Page<Product> listProducts(ProductCond cond) {//如前端没传&a…

赛效:如何自动拼图在线实现多图合一

1&#xff1a;在电脑上打开改图鸭网页版&#xff0c;登录账号后在特色功能里点击“模板拼图”。 2&#xff1a;根据需要图片数量和特点选择对应的拼图模板&#xff0c;然后点击右侧模板里的上传图片。 3&#xff1a;图片添加完成后&#xff0c;除了可以直接在模板里拖动图片进行…

第六章 Electron|Node 实现license激活机制

一、license是什么 ✨ ⭐️ &#x1f31f; license许可证&#xff0c;一般用于软件的授权&#xff0c;我个人的理解就和我们平时的登录差不多。只是说登录时需要我们输入用户名和密码&#xff0c;license一般是开发方提供给你一串加密后的文本&#xff0c;通过这个文本进行一…

Linux5.1 LVS负载均衡群集

文章目录 计算机系统5G云计算第一章 LINUX LVS负载均衡群集一、LVS概述1.群集的含义2.群集的特点3.扩展服务器的方式4.群集的类型5.负载均衡的结构6.负载均衡集群工作模式分析 二、LVS-NAT 的部署1.关于 LVS 虚拟服务器2.LVS的负载调度算法3.使用 ipvsadm 工具 三、NAT模式 LVS…

ChatGPT助力码上行动:零基础学会Python编程

摘要&#xff1a; Python编程作为一种简洁、易学且功能强大的编程语言&#xff0c;正逐渐成为初学者进入编程领域的首选。然而&#xff0c;对于零基础的学习者来说&#xff0c;学习编程仍然存在一定的挑战。本文将介绍如何利用ChatGPT的强大语言生成能力&#xff0c;助力零基础…

元宇宙应用领域-社交

社交是一个古老的话题&#xff0c;人类从最开始的结群&#xff0c;到后来的部落&#xff0c;再到如今的网络社交&#xff0c;可以说人类的社交方式经历了漫长的演化过程。 随着互联网的普及和网络社交方式的不断发展&#xff0c;社交对于人类而言越来越重要。人们在网上不仅可…

SQL语句之DQL语言(二)(多表查询)

准备工作&#xff1a;创建表&#xff0c;添加数据 -- 部门管理 create table tb_dept(id int unsigned primary key auto_increment comment 主键ID,name varchar(10) not null unique comment 部门名称,create_time datetime not null comment 创建时间,update_time datetime…

新招了个从腾讯拿38K离职的测试大佬,让我见识到了什么才是测试界的天花板

现在招个会几年工作经验还会自动化测试的测试工程师真是难呀&#xff0c;10个里面有8个写了会自动化&#xff0c;但一问就是三不知 5年测试工作经验&#xff0c;技术应该是能达到资深测试的水准&#xff0c;即不仅能熟练地开发业务&#xff0c;而且还能熟悉项目的开发&#xff…

【数据结构每日一题】栈——中心对称链

[数据结构习题]栈——中心对称链 &#x1f449;知识点导航&#x1f48e;&#xff1a;【数据结构】栈和队列 &#x1f449;[王道数据结构]习题导航&#x1f48e;&#xff1a; p a g e 70.4 page70.4 page70.4 本节为栈和链表综合练习题 题目描述&#xff1a; &#x1f387;思路…

【论文阅读】dreambooth

简介 目标&#xff1a;subject-driven generation&#xff0c;针对特定物体的图像生成&#xff0c;仅使用少量目标主体图像&#xff0c;dreambooth可以在prompt的指导下生成大量目标主体在不同场景下的图像。例如下图中小狗&#xff0c;我们给定的set就是左侧的input images&a…

Matplotlib - 绘制 高亮显示的饼图 (Highlight Pie Chart) 函数源码

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://blog.csdn.net/caroline_wendy/article/details/131089501 饼图 (Pie Chart) 是一种圆形统计图&#xff0c;被分割成片用于表示数值间的比例关系。每个切片的弧长以及相应的中心角和面积…

【QQ聊天界面-创建自定义Cell Objective-C语言】

一、我们刚才说到这个地方,我们说,用系统的单元格,是不是不够用吧, 1.那么这个时候,我们就要、需要自定义单元格 自定义单元格,我们就新建一个类,继承自UITableViewCell 来,写一下, 那么,这个时候,应该在哪个文件夹下,去新建类啊, 是不是在Views下面吧, 因为…

一文详解!Robot Framework Selenium UI自动化测试入门篇

目录 前言&#xff1a; 自动化框架的选择 测试环境的搭建 导入Selenium2Library包 关键字是什么&#xff1f; 创建测试用例 前言&#xff1a; 自动化测试的重要性越来越受到人们的重视&#xff0c;因为它可以提高测试效率、降低测试成本并减少人为错误的出现。为了满足这…

软考A计划-电子商务设计师-模拟试题卷八

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例 &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff0c;以及各种资源分享&am…

Shiro自定义过滤器会执行两次?看我怎么给你解决

关注“Java架构栈”微信公众号&#xff0c;回复暗号【Java面试题】即可获取大厂面试题。 最近九哥的一个学生在使用自定义ShiroFilter处理JWT校验时&#xff0c;发现自己写的Filter在处理一次请求时会被执行两次。这个问题困扰了他一个下午都没有解决&#xff0c;最后不得不求…

新手小白如何入门学习CTF?【网络安全】

最近有很多新手小白私信我&#xff1a;如何学习CTF&#xff1f;新手小白应该怎么入门CTF&#xff1f;想打CTF&#xff0c;但是没有思路怎么办&#xff1f; 昨天下班之后&#xff0c;花了几个小时&#xff0c;整理了一下CTF学习的思路与方法&#xff0c;分享给大家&#xff0c;如…

用jprofiler来分析 jvm 堆 内存泄露,fullgc

jvm 命令和工具_个人渣记录仅为自己搜索用的博客-CSDN博客 堆太大? 方法1: 重新设置堆后,重启,复现. 方法2: 切割 split -b 1M heap.bin smallfilescp smallfile* usernamemac-host:/Users/username/cat smallfile* > heap.bin官网文档 JProfiler Help - HPROF snapshot…

【逃离】UniAccess

能看到这篇文章&#xff0c;说明你已经是老屁股了&#xff08;保命要紧&#xff09; 上面是UniAccess功能 你想要做的事情无非是三种 不顾后果强力卸载UniAccess期望只保留(内网)网络认证禁用UniAccess部分功能 第一种&#xff1a;直接卸载&#xff0c;这里不做说明了&#x…

开发轮子(一):全国省/市/区/街道三、四级联动

概述 本服务提供中国标准行政区划数据查询功能&#xff0c;支持&#xff1a; 1 . 全国省、市、区/县、乡镇/街道 四级行政区划数据&#xff1b; 2 . 支持三级区划&#xff08;省/市 - 区/县&#xff09;轮廓数据&#xff1b; 3 . 支持区划查询、省市区列表、查询子级区划等功能…