Pytorch 多卡并行(3)—— 使用 DDP 加速 minGPT 训练

news2025/1/10 21:38:20
  • 前文 并行原理简介和 DDP 并行实践 和 使用 torchrun 进行容错处理 在简单的随机数据上演示了使用 DDP 并行加速训练的方法,本文考虑一个更加复杂的 GPT 类模型,说明如何进行 DDP 并行实战
  • MinGPT 是 GPT 模型的一个流行的开源 PyTorch 复现项目,其实现简洁干净可解释,因而颇具教育意义。关于 MinGPT 的详细介绍可以参考 minGPT 代码详解(训练 GPT 模型执行两位数加法)
  • 本文参考自:Pytorch 官方教程
  • 完整代码下载:wxc971231/ddp-tutorial-series

文章目录

  • 0. 项目组织
  • 1. 参数准备
  • 2. 数据准备
  • 3. 程序入口
  • 4. 定义模型
  • 5. 定义 Trainer

0. 项目组织

  • 本文改写 MinGPT 库中的 chargpt 例程,这是一个 character-level 语言模型项目,组织如下
    在这里插入图片描述
  • 说明一下主要文件内容
    1. data/input.txt 是训练用的数据集
    2. char_dataset.py 定义了一个 char-level 的 torch.utils.data.Dataset
    3. gpt_snapshot.pt 是程序运行过程中保存的快照,使用 torchrun 时可以从此重启所有进程的训练
    4. gpt2_train_cfg.yaml 是 yaml 配置文件,记录了训练超参数
    5. main.log 是 hydra 生成的 logging 文件
    6. main.py 是程序入口,符合前文 使用 torchrun 进行容错处理 第1节给出的标准形式
    7. model.py 定义了 GPT 模型结构和 optimizer 的构造方法
    8. trainer.py 定义了训练过程,包括快照保存和加载等操作

1. 参数准备

  • 本项目使用 YAML文件存储超参数设置。YAML 是一种轻量级的数据序列化格式。相较于JSON等其他格式,YAML 更加易读易写,也更加适合用于配置文件等场景。YAML的语法结构主要包含键值对、列表、注释等几种元素
    data_config:
      path: ./data/input.txt
      block_size: 128   # 输入序列长度
      train_split: 0.9  # 训练集和测试集划分
      truncate: 0.02    # 只用5%的数据进行训练
    gpt_config:
      n_layer: 8
      n_head: 8
      n_embd: 512       
    trainer_config:
      max_epochs: 10
      batch_size: 216
      data_loader_workers: 4
      grad_norm_clip: 1.0
      snapshot_path: gpt_snapshot.pt
      save_every: 3
      use_amp: True
    optimizer_config:
      weight_decay: 0.1
      learning_rate: 0.0003
    
    hydra:
      run:
        dir: ./
    

    使用yaml文件时,可以使用 ${node.key} 的方式引用yaml中的其他变量;如果超参数的值缺失,可以使用 ??? 输入缺失值,或使用 null 输入空值。

  • 使用 Hydra 来管理超参数,它可以以装饰器的形式方便地加载不同路径下的 yaml 配置文件,最小用例如下
    import hydra
    from omegaconf import DictConfig
    
    @hydra.main(version_base=None, config_path='configs', config_name='config')
    def main(cfg: DictConfig) -> None:
        cfg['key'] # 获得对应的参数值
    
    if __name__ == '__main__':
        main()
    
    这样就把 ./configs/config.yaml 文件的参数加载到 main 函数中了,使用 cfg['key'] 这样的形式获取参数值
  • 使用 Hydra 还有一个好处是它对 logging 标准库进行了包装。在 hydra.main 装饰器中对 log 输出格式规范为 "[%(asctime)s][%(name)s][%(levelname)s] - %(message)s",并设置 level 为 info,运行程序就会自动生成 main.log 日志文件。可以通过命令行的hydra.verbose 参数修改 log 的显示 level

2. 数据准备

  • 使用的数据是 tiny-shakespear 数据集,它是一个记录了一些英文对话的文本文档,截取如下
    First Citizen:
    Before we proceed any further, hear me speak.
    
    All:
    Speak, speak.
    
    First Citizen:
    You are all resolved rather to die than to famish?
    
    All:
    Resolved. resolved.
    
    First Citizen:
    First, you know Caius Marcius is chief enemy to the people.
    
    All:
    We know't, we know't.
    
    First Citizen:
    Let us kill him, and we'll have corn at our own price.
    Is't a verdict?
    
    All:
    No more talking on't; let it be done: away, away!
    
  • 下面来构造数据集,思路是把 txt 文件中所有字符去重排序生成 vocab table;样本生成时先把 txt 内容全部读取进来,然后构造 n-gram 样本。如下
    import torch
    from torch.utils.data import Dataset
    import fsspec
    from dataclasses import dataclass
    
    """
    Adapted from https://github.com/karpathy/minGPT/blob/master/projects/chargpt/chargpt.py
    """
    
    @dataclass
    class DataConfig:
        path: str = None
        block_size: int = None      # 输入序列长度    
        train_split: float = None   # 训练集和测试集划分
        truncate: float = 1.0       # 用于训练的数据占全体数据的比例
    
    class CharDataset(Dataset):
    
        def __init__(self, data_cfg: DataConfig): #data_path: str, block_size):
            # 加载所需比例的数据
            data = fsspec.open(data_cfg.path).open().read().decode('utf-8')
            data = data[ : int(len(data) * data_cfg.truncate)]
    
            # Set 去重,转 list 后排序得到数据集中的唯一字符列表作为词表
            chars = sorted(list(set(data))) 
            data_size, vocab_size = len(data), len(chars)
            print('Data has %d characters, %d unique.' % (data_size, vocab_size))
    
            # 得到字符和词表索引之间的双射
            self.stoi = {ch: i for i, ch in enumerate(chars)}   # 字符 -> 词表索引
            self.itos = {i: ch for i, ch in enumerate(chars)}   # 词表索引 -> 字符
            
            self.block_size = data_cfg.block_size  	# 模型输入序列长度
            self.vocab_size = vocab_size			# 词表尺寸
            self.data = data
    
        def __len__(self):
            return len(self.data) - self.block_size
    
        def __getitem__(self, idx):
            # grab a chunk of (block_size + 1) characters from the data
            chunk = self.data[idx:idx + self.block_size + 1]
            
            # encode every character to an integer
            dix = [self.stoi[s] for s in chunk]
            x = torch.tensor(dix[:-1], dtype=torch.long)
            y = torch.tensor(dix[1:], dtype=torch.long)
            return x, y
    

3. 程序入口

  • 使用 torchrun 命令进行容错,按前文 使用 torchrun 进行容错处理 给出的标准形式来编写程序入口(mian.py),如下
    import os
    import torch
    from torch.utils.data import random_split
    from torch.distributed import init_process_group, destroy_process_group
    from model import GPT, GPTConfig, OptimizerConfig, create_optimizer
    from trainer import Trainer, TrainerConfig
    from char_dataset import CharDataset, DataConfig
    from omegaconf import DictConfig
    import hydra
    
    
    def ddp_setup():
        init_process_group(backend="nccl")
        torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
    
    def get_train_objs(gpt_cfg: GPTConfig, opt_cfg: OptimizerConfig, data_cfg: DataConfig):
        dataset = CharDataset(data_cfg)
        train_len = int(len(dataset) * data_cfg.train_split)
        train_set, test_set = random_split(dataset, [train_len, len(dataset) - train_len])
    
        gpt_cfg.vocab_size = dataset.vocab_size
        gpt_cfg.block_size = dataset.block_size
        model = GPT(gpt_cfg)
        optimizer = create_optimizer(model, opt_cfg)
        
        return model, optimizer, train_set, test_set
     
    @hydra.main(version_base=None, config_path=".", config_name="gpt2_train_cfg")
    def main(cfg: DictConfig):
        # 初始化进程池
        ddp_setup()
    
        # 从 yaml 文件读取超参数
        gpt_cfg = GPTConfig(**cfg['gpt_config'])
        opt_cfg = OptimizerConfig(**cfg['optimizer_config'])
        data_cfg = DataConfig(**cfg['data_config'])
        trainer_cfg = TrainerConfig(**cfg['trainer_config'])
    
        # 创建训练对象
        model, optimizer, train_data, test_data = get_train_objs(gpt_cfg, opt_cfg, data_cfg)
        trainer = Trainer(trainer_cfg, model, optimizer, train_data, test_data)
        
        # 开始训练
        trainer.train()
    
        # 训练完成后,销毁进程池
        destroy_process_group()
    
    
    if __name__ == "__main__":
        main()
    
    '''
    运行命令: 
        .py
    '''
    
  • 注意其中使用 hydra.main 装饰器来加载参数;运行时使用以下命令指定 GPU 运行
    CUDA_VISIBLE_DEVICES=1,2 torchrun --standalone --nproc_per_node=gpu main.py
    

4. 定义模型

  • 整个模型定义部分相比 MinGPT 原始代码逻辑上没有区别,只是换了一下写法看起来更清晰一点。首先定义两个 @dataclass 保存模型和优化器参数
    from dataclasses import dataclass
    import math
    import torch
    import torch.nn as nn
    from torch.nn import functional as F
    
    @dataclass
    class GPTConfig:
        model_type: str = 'gpt2'
        # model configurations
        n_layer: int = None
        n_head: int = None
        n_embd: int =  None
        # openai's values for gpt2
        vocab_size: int = 50257 
        block_size: int = 1024
        # dropout hyperparameters
        embd_pdrop: float = 0.1
        resid_pdrop: float = 0.1
        attn_pdrop: float = 0.1
    
    @dataclass
    class OptimizerConfig:
        learning_rate: float = 3e-4
        weight_decay: float = 0.1
    
  • 定义多头 masked self-attention 模块,原本 MinGPT 库是全部手写的,这里则用了 pytorch 自己的多头注意力模块。具体做法是使用 torch.nn.MultiheadAttention 定义普通多头注意力层,在 forward 方法中用同一个序列输入构造 qkv 实现 self-attention,再用过对注意力输出设置遮盖实现 mask
    class MultiheadAttentionLayer(nn.Module):
        """
        A multi-head masked self-attention layer with a projection at the end.
        """
    
        def __init__(self, config, device="cpu", dtype=torch.float32):
            super().__init__()
            assert config.n_embd % config.n_head == 0
            self.resid_drop = nn.Dropout(config.resid_pdrop)
            
            # output projection
            self.c_proj = nn.Linear(config.n_embd, config.n_embd, device=device, dtype=dtype)
    
            # Causal mask。注意这个mask是通过 self.register_buffer 方法登记的
            # 这样登记过的张量可以求梯度也可以随模型在 CPU/GPU 之间移动,但是不进行参数优化
            self.register_buffer("mask", torch.tril(torch.ones(config.block_size, config.block_size))
                                 .view(1, 1, config.block_size, config.block_size))
            
            self.attn = torch.nn.MultiheadAttention(
                embed_dim=config.n_embd,
                num_heads=config.n_head,
                dropout=config.attn_pdrop,
                batch_first=True,
                device=device,
                dtype=dtype
            )
    
        def forward(self, x):
            _, seq_size, _ = x.size()   # batch size, sequence length, embedding dimensionality (n_embd)
            y = self.attn(x, x, x, attn_mask=self.mask[0, 0, :seq_size, :seq_size])[0]
            y = self.resid_drop(self.c_proj(y))
            return y
    

    我感觉这里 self.attn(x, x, x, attn_mask=self.mask[0, 0, :seq_size, :seq_size])[0] 的调用有问题,因为 torch.nn.MultiheadAttention 的前向过程需要输入 query,key 和 value 三个 tensor,这里应该把 x 用三个线性层变换后再作为输入。如果读者有其他想法可以和我讨论。考虑到本文主要说明 DDP 并行,暂不关注此问题

  • 定义 Transformer block
    class Block(nn.Module):
        """ an unassuming Transformer block """
        def __init__(self, config: GPTConfig):
            super().__init__()
            self.ln1 = nn.LayerNorm(config.n_embd)
            self.ln2 = nn.LayerNorm(config.n_embd)
            self.attn = MultiheadAttentionLayer(config)
            self.mlp = nn.Sequential(
                nn.Linear(config.n_embd, 4 * config.n_embd),
                nn.GELU(),
                nn.Linear(4 * config.n_embd, config.n_embd),
                nn.Dropout(config.resid_pdrop),
            )
    
        def forward(self, x):
            x = x + self.attn(self.ln1(x))
            x = x + self.mlp(self.ln2(x))
            return x
    
  • 定义字符嵌入层,用 nn.Embedding 嵌入 token,再设置一个 nn.Parameter 作为可学习的位置编码
    class EmbeddingStem(nn.Module):
        def __init__(self, config: GPTConfig, device="cpu", dtype=torch.float32):
            super().__init__()
            self.tok_emb = nn.Embedding(config.vocab_size, config.n_embd, device=device, dtype=dtype)
            self.pos_emb = nn.Parameter(torch.zeros(1, config.block_size, config.n_embd, device=device, dtype=dtype))
            self.drop = nn.Dropout(config.embd_pdrop)
            self.block_size = config.block_size
    
        def reset_parameters(self): 
            self.tok_emb.reset_parameters() # 将 nn.Embedding 层参数初始化为正态分布采样
    
        def forward(self, idx):
            b, t = idx.size()
            assert t <= self.block_size, f"Cannot forward sequence of length {t}, block size is only {self.block_size}"
    
            token_embeddings = self.tok_emb(idx)            # each index maps to a (learnable) embedding vector
            position_embeddings = self.pos_emb[:, :t, :]    # each position maps to a (learnable) position vector
            return self.drop(token_embeddings + position_embeddings)
    
  • 把以上组件合在一起,定义 GPT 模型
    class GPT(nn.Module):
        """ GPT Language Model """
    
        def __init__(self, config: GPTConfig):
            super().__init__()
            self.block_size = config.block_size
            config = self._set_model_config(config)
    
            # input embedding stem
            self.emb_stem = EmbeddingStem(config)
            # transformer
            self.blocks = nn.Sequential(*[Block(config) for _ in range(config.n_layer)])
            # decoder head
            self.ln_f = nn.LayerNorm(config.n_embd)
            self.head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
    
            # init all weights, and apply a special scaled init to the residual projections, per GPT-2 paper
            self.apply(self._init_weights)
            for pn, p in self.named_parameters():
                if pn.endswith('c_proj.weight'):
                    p.data.normal_(mean=0.0, std=0.02/math.sqrt(2 * config.n_layer))
    
            # report number of parameters (note we don't count the decoder parameters in lm_head)
            n_params = sum(p.numel() for p in self.blocks.parameters())
            print("number of parameters: %.2fM" % (n_params/1e6,))
    
        def _set_model_config(self, config):
            type_given = config.model_type is not None
            params_given = all([config.n_layer is not None, config.n_head is not None, config.n_embd is not None])
            # assert type_given ^ params_given # exactly one of these (XOR)
            if type_given and not params_given:
                # translate from model_type to detailed configuration
                config.__dict__.update({
                    # names follow the huggingface naming conventions
                    # GPT-1
                    'openai-gpt':   dict(n_layer=12, n_head=12, n_embd=768),  # 117M params
                    # GPT-2 configs
                    'gpt2':         dict(n_layer=12, n_head=12, n_embd=768),  # 124M params
                    'gpt2-medium':  dict(n_layer=24, n_head=16, n_embd=1024), # 350M params
                    'gpt2-large':   dict(n_layer=36, n_head=20, n_embd=1280), # 774M params
                    'gpt2-xl':      dict(n_layer=48, n_head=25, n_embd=1600), # 1558M params
                    # Gophers
                    'gopher-44m':   dict(n_layer=8, n_head=16, n_embd=512),
                    # (there are a number more...)
                    # I made these tiny models up
                    'gpt-mini':     dict(n_layer=6, n_head=6, n_embd=192),
                    'gpt-micro':    dict(n_layer=4, n_head=4, n_embd=128),
                    'gpt-nano':     dict(n_layer=3, n_head=3, n_embd=48),
                }[config.model_type])
            return config
        
        def _init_weights(self, module):
            if isinstance(module, (nn.Linear, nn.Embedding)):
                module.weight.data.normal_(mean=0.0, std=0.02)
                if isinstance(module, nn.Linear) and module.bias is not None:
                    module.bias.data.zero_()
            elif isinstance(module, nn.LayerNorm):
                module.bias.data.zero_()
                module.weight.data.fill_(1.0)
    
        def forward(self, idx, targets=None):
            x = self.emb_stem(idx)
            x = self.blocks(x)
            x = self.ln_f(x)
            logits = self.head(x)
    
            # if we are given some desired targets also calculate the loss
            loss = None
            if targets is not None:
                loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)
    
            return logits, loss
    
        @torch.no_grad()
        def generate(self, idx, max_new_tokens, temperature=1.0, do_sample=False, top_k=None):
            """
            Take a conditioning sequence of indices idx (LongTensor of shape (b,t)) and complete
            the sequence max_new_tokens times, feeding the predictions back into the model each time.
            Most likely you'll want to make sure to be in model.eval() mode of operation for this.
            """
            for _ in range(max_new_tokens):
                # if the sequence context is growing too long we must crop it at block_size
                idx_cond = idx if idx.size(1) <= self.block_size else idx[:, -self.block_size:]
                # forward the model to get the logits for the index in the sequence
                logits, _ = self(idx_cond)
                # pluck the logits at the final step and scale by desired temperature
                logits = logits[:, -1, :] / temperature
                # optionally crop the logits to only the top k options
                if top_k is not None:
                    v, _ = torch.topk(logits, top_k)
                    logits[logits < v[:, [-1]]] = -float('Inf')
                # apply softmax to convert logits to (normalized) probabilities
                probs = F.softmax(logits, dim=-1)
                # either sample from the distribution or take the most likely element
                if do_sample:
                    idx_next = torch.multinomial(probs, num_samples=1)
                else:
                    _, idx_next = torch.topk(probs, k=1, dim=-1)
                # append sampled index to the running sequence and continue
                idx = torch.cat((idx, idx_next), dim=1)
    
            return idx
    
  • 最后我们来定义优化器,
    def create_optimizer(model: torch.nn.Module, opt_config: OptimizerConfig):
        """
        This long function is unfortunately doing something very simple and is being very defensive:
        We are separating out all parameters of the model into two buckets: those that will experience
        weight decay for regularization and those that won't (biases, and layernorm/embedding weights).
        We are then returning the PyTorch optimizer object.
        """
    
        # separate out all parameters to those that will and won't experience regularizing weight decay
        decay = set()
        no_decay = set()
        whitelist_weight_modules = (torch.nn.Linear, )
        blacklist_weight_modules = (torch.nn.LayerNorm, torch.nn.Embedding)
        for mn, m in model.named_modules():
            for pn, p in m.named_parameters():
                fpn = '%s.%s' % (mn, pn) if mn else pn # full param name
                # random note: because named_modules and named_parameters are recursive
                # we will see the same tensors p many many times. but doing it this way
                # allows us to know which parent module any tensor p belongs to...
                if pn.endswith('bias'):
                    # all biases will not be decayed
                    no_decay.add(fpn)
                elif pn.endswith('weight') and isinstance(m, whitelist_weight_modules):
                    # weights of whitelist modules will be weight decayed
                    decay.add(fpn)
                elif pn.endswith('in_proj_weight'):
                    # MHA projection layer
                    decay.add(fpn)
                elif pn.endswith('weight') and isinstance(m, blacklist_weight_modules):
                    # weights of blacklist modules will NOT be weight decayed
                    no_decay.add(fpn)
                elif pn.endswith('pos_emb'):
                    # positional embedding shouldn't be decayed
                    no_decay.add(fpn)
    
        # validate that we considered every parameter
        param_dict = {pn: p for pn, p in model.named_parameters()}
        inter_params = decay & no_decay
        union_params = decay | no_decay
        assert len(inter_params) == 0, "parameters %s made it into both decay/no_decay sets!" % (str(inter_params), )
        assert len(param_dict.keys() - union_params) == 0, "parameters %s were not separated into either decay/no_decay set!" \
                                                    % (str(param_dict.keys() - union_params), )
    
        # create the pytorch optimizer object
        optim_groups = [
            {"params": [param_dict[pn] for pn in sorted(list(decay))], "weight_decay": opt_config.weight_decay},
            {"params": [param_dict[pn] for pn in sorted(list(no_decay))], "weight_decay": 0.0},
        ]
        optimizer = torch.optim.AdamW(optim_groups, lr=opt_config.learning_rate, betas=(0.9, 0.95))
        return optimizer
    
    这里主要是通过权重衰减方法来进行正则化避免过拟合。注意到作者通过一个二重遍历考察 GPT 模型所有 sub module 的所有 parameters,仅对所有 torch.nn.Linear 层的 weight 参数进行衰减,bias 参数及所有 torch.nn.LayerNormtorch.nn.Embedding 模块的参数都不做处理。由于模块是递归组织的,这个二重遍历会重复访问很多参数,所以通过 set 自动去重,最后根据处理结果定义 torch.optim.AdamW 优化器返回

    关于权重衰减的理论说明,参考:机器学习基础(6)—— 使用权重衰减和丢弃法缓解过拟合问题

5. 定义 Trainer

  • Trainer 定义和原始 MinGPT 库主要有两个区别

    1. 按指定周期要求 rank0 进程保存 snapshot,本项目中应包含 epoch、模型参数和优化器参数三部分内容;初始化 Trainer 时应当加载可能存在的 snapshot 文件,这样在 torchrun 自动重启进程时可以从最近的 snapshot 恢复训练

    2. 可以使用 torch.cuda.amp.GradScaler 进行混合精度训练

      • 混合精度训练(Mixed Precision Training)是一种训练深度学习模型的技术,旨在提高模型的训练速度和效率。它利用了现代GPU可以混合计算精度的硬件特性,使用FP16数据类型对模型中的某些操作进行加速。具体而言,模型的参数通常使用FP32数据类型,而输入数据和梯度则使用FP16数据类型,从而减少内存开销,加速计算速度,提高模型的训练效率。此外,混合精度训练还可以通过减少浮点运算和内存访问,降低能源消
      • 混合精度训练的主要困难在于 fp16 的表示范围有限,在训练中常出现溢出问题,尤其是下溢出,因为在网络训练的后期,模型的梯度往往很小;另外还有舍入误差问题,这是指当梯度过小,小于当前区间内的最小间隔时,该次梯度更新可能会失效
      • 解决以上问题的方法包括损失缩放FP32权重备份等,前者对计算出的 loss 值进行缩放(scale),这样梯度也会被缩放进而平移到 FP16 的有效范围内存储,在进行梯度更新之前先将缩放后的梯度转化为 FP32 再unscale回去;后者将模型权重、激活值、梯度等数据用 FP16 来存储,同时维护一份 FP32 的模型权重副本用于更新。在反向传播得到 FP16 的梯度以后,将其转化成 FP32 并 unscale,最后更新 FP32 的模型权重。因为整个更新过程是在 FP32 的环境中进行的,所以不会出现舍入误差
      • 有一些代码库可以帮助我们快速实现混合精度训练,而无需大幅修改代码,包括 nvidia 的 apex 库和 pytorch 1.6 后引入的 amp 库等

      本项目使用 pytorch 的 amp 库进行混合精度训练,主要用到 GradScaler 和 autocast 两个组件。其中 Gradscalar 对会检查梯度是否发现溢出,并对优化器进行控制 (将丢弃的batches转换为 no-op);autocast 是一个上下文管理器,当进入 autocast 上下文后,tensor 的数据类型会自动转换为半精度浮点型,从而在不损失训练精度的情况下加快运算,而不需要手动调用 .half()。 一个最小实践示例为

      from torch.cuda.amp import autocast as autocast, GradScaler
      '''
      other code
      '''
       
      # 在训练最开始之前实例化一个GradScaler对象
      scaler = GradScaler()
      '''
      other code
      '''
              # 前向过程(model + loss)开启 autocast
              with autocast():
                  output = model(input)
                  loss = loss_fn(output, target)
       
              # Scales loss,这是因为半精度的数值范围有限,因此需要用它放大
              scaler.scale(loss).backward()
       
              # scaler.step() unscale之前放大后的梯度,但是scale太多可能出现inf或NaN
              # 故其会判断是否出现了inf/NaN
              # 如果梯度的值不是 infs 或者 NaNs, 那么调用optimizer.step()来更新权重,
              # 如果检测到出现了inf或者NaN,就跳过这次梯度更新,同时动态调整scaler的大小
              scaler.step(optimizer)
       
              # 查看是否要更新scaler,这个要注意不能丢
              scaler.update()
       
      '''
      other code
      '''
      
  • 下面开始分析 trainer 代码,首先定义两个 @dataclass 存储 Trainer 参数和 snapshot 参数

    @dataclass
    class TrainerConfig:
        max_epochs: int = None
        batch_size: int = None
        data_loader_workers: int = None
        grad_norm_clip: float = None
        snapshot_path: Optional[str] = None
        save_every: int = None
        use_amp: bool = None
    
    @dataclass
    class Snapshot:
        model_state: 'OrderedDict[str, torch.Tensor]'
        optimizer_state: Dict[str, Any]
        finished_epoch: int
    
  • 定义 Trianer 的初始化方法

    class Trainer:
        def __init__(self, trainer_config: TrainerConfig, model, optimizer, train_dataset, test_dataset=None):
            self.config = trainer_config
            # set torchrun variables
            self.local_rank = int(os.environ["LOCAL_RANK"]) # 在所有node的所有进程中当前GPU进程的rank
            self.global_rank = int(os.environ["RANK"])      # 在当前node中当前GPU进程的rank
            
            # data stuff
            self.train_dataset = train_dataset
            self.train_loader = self._prepare_dataloader(train_dataset)
            self.test_loader = self._prepare_dataloader(test_dataset) if test_dataset else None
            
            # initialize train states
            self.epochs_run = 0
            self.model = model.to(self.local_rank)
            self.optimizer = optimizer        
            self.save_every = self.config.save_every
    
            # load snapshot if available. only necessary on the first node.
            if self.config.snapshot_path is None:
                self.config.snapshot_path = "snapshot.pt"
            self._load_snapshot()
    
            # wrap with DDP. this step will synch model across all the processes.
            self.model = DDP(self.model, device_ids=[self.local_rank])
    
            # torch.cuda.amp.GradScaler 是一个用于自动混合精度训练的 PyTorch 工具,它可以帮助加速模型训练并减少显存使用量
            # 具体来说,GradScaler 可以将梯度缩放到较小的范围,以避免数值下溢或溢出的问题,同时保持足够的精度以避免模型的性能下降
            if self.config.use_amp: 
                self.scaler = torch.cuda.amp.GradScaler()
    

    注意几点

    1. torchrun 帮助我们自动分发进程,通过环境变量获取当前运行代码的 GPU rank 信息
    2. 初始化 Trainer 时加载可能存在的 snapshot,实现断点续训
    3. 模型使用 DDP 进行包装
    4. 定义混合精度训练所需的 torch.cuda.amp.GradScaler()
  • 定义 DataLoder,注意使用 DistributedSampler 来分发训练数据

    def _prepare_dataloader(self, dataset: Dataset):
       return DataLoader(
            dataset,
            batch_size=self.config.batch_size,
            pin_memory=True,
            shuffle=False,
            num_workers=self.config.data_loader_workers,
            sampler=DistributedSampler(dataset)                 # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠
        )
    
  • 定义 snapshot 的加载和保存方法

    def _save_snapshot(self, epoch):
    	# capture snapshot
    	model = self.model
    	raw_model = model.module if hasattr(model, "module") else model
    	snapshot = Snapshot(
    	    model_state=raw_model.state_dict(),
    	    optimizer_state=self.optimizer.state_dict(),
    	    finished_epoch=epoch
    	)
    	# save snapshot
    	snapshot = asdict(snapshot)
    	torch.save(snapshot, self.config.snapshot_path)
    	print(f"Snapshot saved at epoch {epoch}")
    
    def _load_snapshot(self):
        try:
            snapshot = fsspec.open(self.config.snapshot_path)   # fsspec 为各种后端存储系统提供统一的 Python 接口,可以用相同的语法打开本地、AWS S3 和 GCS 等各种云存储平台的文件
            with snapshot as f:
                snapshot_data = torch.load(f, map_location="cpu")
        except FileNotFoundError:
            print("Snapshot not found. Training model from scratch")
            return 
    
        snapshot = Snapshot(**snapshot_data)
        self.model.load_state_dict(snapshot.model_state)
        self.optimizer.load_state_dict(snapshot.optimizer_state)
        self.epochs_run = snapshot.finished_epoch
        print(f"Resuming training from snapshot at Epoch {self.epochs_run}")
    
  • 定义训练流程

    def _run_batch(self, source, targets, train: bool = True) -> float:
        with torch.set_grad_enabled(train), torch.amp.autocast(device_type="cuda", dtype=torch.float16, enabled=(self.config.use_amp)):
            _, loss = self.model(source, targets)
        
        if train:
            self.optimizer.zero_grad(set_to_none=True)
            if self.config.use_amp: 
                self.scaler.scale(loss).backward()
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_norm_clip)
                self.scaler.step(self.optimizer)
                self.scaler.update()
            else:
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_norm_clip)
                self.optimizer.step()
        
        #return loss.item()
        return loss
    
    def _run_epoch(self, epoch: int, dataloader: DataLoader, train: bool = True):
        dataloader.sampler.set_epoch(epoch)
        for iter, (source, targets) in enumerate(dataloader):
            step_type = "Train" if train else "Eval"
            source = source.to(self.local_rank)
            targets = targets.to(self.local_rank)
            batch_loss = self._run_batch(source, targets, train)
            if iter % 100 == 0:
                #print(f"[GPU{self.global_rank}] Epoch {epoch} | Iter {iter} | {step_type} Loss {batch_loss.item():.5f}")
                if train:
                    print(f"[GPU{self.global_rank}] Epoch {epoch} | Iter {iter} | {step_type} Loss {batch_loss.item():.5f}")
                else:
                    eval_loss_list = [torch.zeros_like(batch_loss) for _ in range(int(os.environ['WORLD_SIZE']))]
                    dist.gather(
                        batch_loss,
                        eval_loss_list if self.local_rank == 0 else None, 
                        dst=0
                    )
                    if self.local_rank == 0:
                        for i, loss in enumerate(eval_loss_list):
                            print(f"[GPU{i}] Epoch {epoch} | Iter {iter} | {step_type} Loss {loss.item():.5f}")
    
    def train(self):
        for epoch in range(self.epochs_run, self.config.max_epochs):
            epoch += 1
            
            # train for one epoch
            self._run_epoch(epoch, self.train_loader, train=True)
    
            # 各个 GPU 上都在跑一样的训练进程,这里指定 rank0 进程保存 snapshot 以免重复保存
            if self.local_rank == 0 and epoch % self.save_every == 0:
                self._save_snapshot(epoch)
    
            # eval run
            if self.test_loader:
                self._run_epoch(epoch, self.test_loader, train=False)
    

    这里需要注意几点:

    1. 指定 rank0 进程保存 snapshot 以免重复保存
    2. _run_batch 方法中,计算 loss 的部分设置在 torch.amp.autocast 上下文中,启动混合精度训练
    3. _run_epoch 方法中,使用 torch.distributed.gather 原语汇聚各个 GPU 的验证损失信息到 rank0 上,常用这种操作进行 log 训练信息。除此以外 Pytorch 一共提供了六个进程通信原语,如下
      import torch.distributed as dist
      
      dist.broadcast(tensor, src, group)				# 将 tensor 从 src 复制到所有其他进程。
      dist.reduce(tensor, dst, op, group)				# 将 op 应用于每个 tensor 并将结果存储在 dst 中。
      dist.all_reduce(tensor, op, group)				# 与 reduce 相同,但结果存储在所有进程中。
      dist.scatter(tensor, scatter_list, src, group)	# 复制  tensor scatter_lost[i] 到  进程
      dist.gather(tensor,gather_list, dst, group)		# 从 dst 中的所有进程复制 tensor。
      dist.all_gather(tensor_list, tensor, group)		# 将所有进程的 tensor 复制到所有进程上的 tensor_list。
      dist.barrier(group)								# 阻塞组中的所有进程,直到每个进程都进入该函数。
      
      其中 op 操作有四种
      dist.ReduceOp.SUM,
      dist.ReduceOp.PRODUCT,
      dist.ReduceOp.MAX,
      dist.ReduceOp.MIN.
      
      这些方法在需要手动汇聚或分发信息时特别有用,具体用法可以参考 pytorch 官方文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1004016.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

paddleocr关闭log日志打印输出

问题背景 问题分析 可以看到paddleocr输出logging主要有两种&#xff0c;DEBUG和WARNING&#xff0c;因此关闭这两种打印日志即可。 解决方法 import logginglogging.disable(logging.DEBUG) # 关闭DEBUG日志的打印 logging.disable(logging.WARNING) # 关闭WARNING日志的…

Python二分查找详解

在计算机科学中&#xff0c;二分查找算法&#xff08;英语&#xff1a;binary search algorithm&#xff09;&#xff0c;也称折半搜索算法&#xff08;英语&#xff1a;half-interval search algorithm&#xff09;、对数搜索算法&#xff08;英语&#xff1a;logarithmic sea…

Pyinstaller打包EXE时添加版本信息、作者信息并在运行时读取外部配置文件

&#x1f9d1;‍&#x1f4bb;作者名称&#xff1a;DaenCode &#x1f3a4;作者简介&#xff1a;CSDN实力新星&#xff0c;后端开发两年经验&#xff0c;曾担任甲方技术代表&#xff0c;业余独自创办智源恩创网络科技工作室。会点点Java相关技术栈、帆软报表、低代码平台快速开…

亚马逊气候友好认证

“气候友好承诺认证”是亚马逊推出的Climate Pledge Friendly新环保计划。目的是帮助有环境意识、注重绿能的买家更好地找到带认证产品&#xff0c;品牌可以通过确保在亚马逊上销售的产品至少在可持续发展的一个方面得到认证&#xff0c;来参与并为他们的产品获得一个气候友好承…

ORA-32771 cannot add file to bigfile tablespace

ORA-32771 cannot add file to bigfile tablespace 扩容表空间报错 原因 oracle中有两种表空间类型BIGFILE 大文件表空间&#xff1a;只能包含1个大文件(最大尺寸为128 TB) SMALLFILE 小文件表空间&#xff1a;可包含多个数据文件(默认)表空间在创建的时候就会确定好&#xf…

OpenCV(三十八):二维码检测

1.二维码识别原理 功能图形&#xff1a; 位置探测图形&#xff1a;通常&#xff0c;二维码中有三个位置探测图形&#xff0c;呈现L型或大角度十字架形状&#xff0c;分布在二维码的三个角上&#xff0c;用于帮助扫描设备定位二维码的位置和方向。 位置探测图形分隔符&#xf…

2023工博会,正运动机器视觉运动控制一体机应用预览(二)

展会倒计时&#xff1a;7天 本次的中国国际工业博览会正运动技术将携高性能x86平台Windows实时视觉运动控制器VPLC711亮相。 •运动控制机器视觉一站式开发&#xff0c;缩短开发周期&#xff0c;降低硬件成本&#xff1b; •可替代传统的工控机运动控制卡/PLC视觉软件的自动化…

Fultter学习日志(2)-构建第一个flutter应用

依照上一篇中我们新建的flutter应用 让我们更改pubspec.yaml中的内容为 name: namer_app description: A new Flutter project.publish_to: none # Remove this line if you wish to publish to pub.devversion: 0.0.11environment:sdk: >2.19.4 <4.0.0dependencies:fl…

SpringBoot整合Easy-ES实现对ES操作

请确保已有可用的ES&#xff0c;若没有&#xff0c;请移步&#xff1a;Docker安装部署ElasticSearch&#xff08;ES&#xff09; 新建SpringBoot项目 这里是用的springboot版本是2.6.0 引入依赖 <!-- 排除springboot中内置的es依赖,以防和easy-es中的依赖冲突--><…

0基础学习VR全景平台篇 第99篇:百度地图如何上传全景图

蛙色平台现已打通VR全景入驻百度地图全流程&#xff0c;百度全景分为免费版和付费版两种&#xff0c;其中付费支持配置作品音乐、场景漫游热点、联系电话、描述信息。 百度地图上传案例 免费版 付费版 一、百度地图上传流程 1、进入蛙色VR账号后台 &#xff08;1&#xff…

Fiddler抓取HTTPS 详解

对于想抓取HTTPS的测试初学者来说&#xff0c;常用的工具就是fiddler。 但是初学时&#xff0c;大家对于fiddler如何抓取HTTPS难免走歪路&#xff0c;也许你一步步按着网上的帖子成功了&#xff0c;这自然是极好的。 但也有可能没那么幸运&#xff0c;这时候你就会很抓狂。 …

华为云云服务器云耀L实例评测 | 华为云云服务器实例新品全面解析

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

分享一下在微信商城上怎么可以快速实现分销功能

微信商城上的分销功能是一种吸引更多用户和提升销售的方式&#xff0c;通过搭建一个以分销为主的平台&#xff0c;商家可以借助用户的力量来推广自己的产品或服务。下面将介绍在微信商城上如何实现分销功能&#xff0c;包括分销模式的选择、开发流程和推广方法。 一、选择适合的…

BUG定位分析方法

作为测试人员&#xff0c;接触最多的就是bug&#xff0c;怎样才能体现出测试人的专业性&#xff1f;能够精准的定位并分析bug一定是你的加分项。 在什么地方干了什么产生了什么结果&#xff0c;和期望的结果不一致&#xff0c;那么这就是一个bug。人人都能找出bug&#xff0c;…

【SpringMVC】JSR 303与拦截器注释使用

目录 一、JSR 303 1.1 JSR 303介绍 1.2 为什么要使用JSR-303 1.3 常用注解 1.4 使用示例 1.4.1 导入JSR303依赖 1.4.2 配置校验规则 1.4.3 编写方法校验 1.4.4 编写前端 1.4.5 测试 ​编辑 1.5 Validated与Valid区别 二、拦截器&#xff08;interceptor&#xff09…

SpringBoot2.0入门(详细文档)

文章目录 Springboot是什么Springboot2.x依赖环境和版本新特性说明为什么学习Springboot从springboot优点来看从未来发展的趋势来看 开发环境Spring Boot开发环境搭建和项目启动jdk 的配置Spring Boot 工程的构建maven配置IDEA 快速构建maven 创建工程常用注解 完整代码 Spring…

延迟win11的更新

自从升级到win11之后&#xff0c;发现更新插件的频率高得有点过分,每隔几天就是一次. 我看网上有人能把更新时间延迟几十万年,所以我心动了! 我试了一下:成功延迟到2099年 创建一个文本文件&#xff0c;命名为&#xff1a;“stopupdate.reg”&#xff0c;然后用记事本或者代码…

SpringMVC之入门:springmcx工作流程,springmvc的入门,静态资源处理器

springmvc工作流程springmvc的入门静态资源处理 1.springmvc工作流程 自定义mvc流程&#xff1a;1.客户端浏览器发送请求url http://localhost:8080/mvc/book.action?methodNamelist 2.被中央控制器dispatchServlet接收 *.action 将url处理&#xff0c;截取得到 *(/book) 3.通…

水循环原理VR实景教学课件开发

日本核污水排海让人们越来越重视海洋大气层水循环的安全&#xff0c;水循环是一个周而复始、循环往复的动态过程&#xff0c;为了将水循环过程以形象、生动地形式展示出来&#xff0c;水循环VR全景动态演示逐渐受到大家青睐。 传统的水循环教育方式通常是通过图片、动画或实地考…

python机器人编程——用python实现一个写字机器人

目录 一、前言二、整体框架2.1 系统构成2.2 硬件介绍2.2.1主要组成部分2.2.2机械结构2.2.3驱动及控制主板PS电机驱动原理简介: 2.2.4其余部分 2.3 机器人python程序框架2.3.1通信服务模块2.3.2消息处理模块2.3.3轨迹解析模块2.3.4机械臂逆解模块2.3.5写字板模块 三、机械臂的建…