Pytorch 多卡并行(1)—— 原理简介和 DDP 并行实践

news2024/11/19 5:57:57
  • 近年来,深度学习模型的规模越来越大,需要处理的数据也越来越多,单卡训练的显存空间和计算效率都越来越难以满足需求。因此,多卡并行训练成为了一个必要的解决方案
  • 本文主要介绍使用 Pytorch 的 DistributedDataParallel(DDP)库进行分布式数据并行训练的方法

文章目录

  • 1. 多卡并行简介
    • 1.1 两种并行形式
    • 1.2 Pytorch 中的多卡并行
  • 2. 使用 DDP 进行单机多卡训练
    • 2.1 原理概述
    • 2.2 使用 DDP 改写单卡训练代码

1. 多卡并行简介

  • 多卡并行训练主要用于解决以下几个问题:
    1. 相同 batch size 下加速训练:多卡并行可以将数据分为多份同时在不同的GPU上运行,从而大大加快训练速度
    2. 相同速度下使用更大的 batch size:多卡并行可以在多个GPU之间共享显存,允许我们设置更大的 batch size
    3. 增加可训练的模型规模:有些模型参数多到单卡训练无法承受,而多卡并行可以将模型放入多个GPU中,从而扩充可训练模型的规模

1.1 两种并行形式

  • 多卡并行训练有数据并行和模型并行两种形式
    在这里插入图片描述

    1. 数据并行:每个GPU都保存一个模型副本,训练数据划分成多份交给各个GPU计算梯度,然后汇总梯度更新模型参数。根据梯度汇总的方式,数据并行又可以分成 Parameter ServerRing All-Reduce 两种,前者使用一个 master GPU 汇总梯度更新参数,再将参数分发给各个模型;后者以环的形式互相传递梯度,每个GPU都维护一个优化器,各自汇总梯度并自行更新模型参数。Ring All-Reduce 方案能更高效地利用所有卡的上下行带宽,是目前的主流方案
      在这里插入图片描述

    2. 模型并行:将模型切分成多个部分放在不同的GPU上并行运行,每个GPU负责处理一部分模型参数,并将处理后的结果发送到其它GPU进行合并,从而实现整体模型的更新。这种操作目前并不常见,一是因为大部分模型单卡都放得下,二是因为通讯开销比数据并行多。根据模型切分方式,模型并行也可以分成 Pipelined ParallelismTensor Slicing 两种,前者将模型的各个层放到不同的 GPU 上运行,这种做法比较通用,但是效率不高;后者针对模型中各种模块(attention、FFN 等)的张量计算操作进行拆解,把 tensor 计算分块分散到不同的机器上进行并行,效率较高但是通用性差
      在这里插入图片描述

  • 关于各种并行方法的详细说明可以参考:分布式训练、混合精度训练、梯度累加…一文带你优雅地训练大型模型

1.2 Pytorch 中的多卡并行

  • 随着各种深度学习框架的日趋完善,很多并行方法已经被整合其中,这让实现多卡并行加速训练变得相对简单。Pytorch 中提供了 DP(DataParallel) 和 DDP(DistributedDataParallel) 两种数据并行方法,它们的性能对比如下
    在这里插入图片描述
    红色柱子是 DP,绿色柱子是 DDP,蓝色柱子是 DDP + Apex 混合精度训练。注意到 DDP 的表现大幅优于 DP,这是因为
    1. DP 使用 Parameter Server 方式汇聚梯度并更新参数,主卡计算负载和通信带宽需求相比其他卡都显著高,导致主卡的计算能力和上下行带宽成为性能瓶颈;
    2. DDP 使用更高效的 Ring All-Reduce 方案,基本实现了 “使用几块GPU就是几倍加速” 的效果
  • 接下来本文会介绍使用 DDP 进行多卡加速的具体做法,参考自:Pytorch 官方教程

2. 使用 DDP 进行单机多卡训练

2.1 原理概述

  • DDP 会在每个 GPU 上运行一个进程,每个进程中都有一套完全相同的 Trainer 副本(包括 model 和 optimizer),各个进程之间通过一个进程池进行通信。这里有几个术语
    1. node:多机多卡运行时,每个机器称为一个 “node”,其中每一张卡都可以运行一个并行进程
    2. world size:所有并行进程的总数,各个 node 上并行的GPU总数
    3. rank:所有 node 的所有进程中,各个进程的标识符号,是从0开始计数的整数
    4. local rank:当前 node 的所有进程中,各个进程的标识符号,是从0开始计数的整数
    5. group:所有并行的进程组成一个 group(进程池),只有组内的进程间才可以相互通信

2.2 使用 DDP 改写单卡训练代码

  • 考虑如何将以下单机单卡代码改为 DDP 单机多卡运行

    # 单 GPU 训练示例
    import torch
    import torch.nn.functional as F
    from torch.utils.data import Dataset, DataLoader
    
    class Trainer:
        def __init__(
            self,
            model: torch.nn.Module,
            train_data: DataLoader,
            optimizer: torch.optim.Optimizer,
            gpu_id: int,
            save_every: int, 
        ) -> None:
            self.gpu_id = gpu_id
            self.model = model.to(gpu_id)
            self.train_data = train_data
            self.optimizer = optimizer
            self.save_every = save_every
    
        def _run_batch(self, source, targets):
            self.optimizer.zero_grad()
            output = self.model(source)
            loss = F.cross_entropy(output, targets)
            loss.backward()
            self.optimizer.step()
    
        def _run_epoch(self, epoch):
            b_sz = len(next(iter(self.train_data))[0])
            print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
            for source, targets in self.train_data:
                source = source.to(self.gpu_id)
                targets = targets.to(self.gpu_id)
                self._run_batch(source, targets)
    
        def _save_checkpoint(self, epoch):
            ckp = self.model.state_dict()
            PATH = "checkpoint.pt"
            torch.save(ckp, PATH)
            print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")
    
        def train(self, max_epochs: int):
            for epoch in range(max_epochs):
                self._run_epoch(epoch)
                if epoch % self.save_every == 0:
                    self._save_checkpoint(epoch)
    
    class MyTrainDataset(Dataset):
        def __init__(self, size):
            self.size = size
            self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]
    
        def __len__(self):
            return self.size
        
        def __getitem__(self, index):
            return self.data[index]
    
    def load_train_objs():
        train_set = MyTrainDataset(2048)  # load your dataset
        model = torch.nn.Linear(20, 1)  # load your model
        optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
        return train_set, model, optimizer
    
    def prepare_dataloader(dataset: Dataset, batch_size: int):
        return DataLoader(
            dataset,
            batch_size=batch_size,
            pin_memory=True,
            shuffle=True
        )
    
    def main(device, total_epochs, save_every, batch_size):
        dataset, model, optimizer = load_train_objs()
        train_data = prepare_dataloader(dataset, batch_size)
        trainer = Trainer(model, train_data, optimizer, device, save_every)
        trainer.train(total_epochs)
    
    if __name__ == "__main__":
        import argparse
        parser = argparse.ArgumentParser(description='simple distributed training job')
        parser.add_argument('--total-epochs', type=int, default=50, help='Total epochs to train the model')
        parser.add_argument('--save-every', type=int, default=10, help='How often to save a snapshot')
        parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
        args = parser.parse_args()
        
        device = 0  # shorthand for cuda:0
        main(device, args.total_epochs, args.save_every, args.batch_size)
    
  • 将单卡训练代码改写为 DDP 并行的要点如下

    1. 引入 DDP 相关库
      # 使用 DistributedDataParallel 进行单机多卡训练
      import torch
      import torch.nn.functional as F
      from torch.utils.data import Dataset, DataLoader
      import os
      
      # 对 python 多进程的一个 pytorch 包装,用于后续分发进程
      import torch.multiprocessing as mp
      # 这个 sampler 可以把采样的数据分散到各个 CPU 上                                      
      from torch.utils.data.distributed import DistributedSampler     
      # 实现分布式数据并行的核心类        
      from torch.nn.parallel import DistributedDataParallel as DDP         
      # 各个进程之间通过一个进程池进行通信,这两个方法来初始化和销毁进程池
      from torch.distributed import init_process_group, destroy_process_group 
      
    2. 在程序入口初始化进程池;在程序出口销毁进程池
      def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
          # 初始化进程池
          ddp_setup(rank, world_size)
      
          # 进行训练
          dataset, model, optimizer = load_train_objs()
          train_data = prepare_dataloader(dataset, batch_size)
          trainer = Trainer(model, train_data, optimizer, rank, save_every)
          trainer.train(total_epochs)
         
          # 销毁进程池
          destroy_process_group()
      
    3. 使用 DistributedDataParallel 包装模型,这样模型才能在各个进程间同步参数
      self.model = DDP(model, device_ids=[gpu_id])    # model 要用 DDP 包装一下
      
      包装后 model 变成了一个 DDP 对象,要访问其参数得这样写 self.model.module.state_dict()
    4. 构造 Dataloader 时使用 DistributedSampler 作为 sampler,这个采样器可以自动将数量为 batch_size 的数据分发到各个GPU上,并保证数据不重叠
      def prepare_dataloader(dataset: Dataset, batch_size: int):
          return DataLoader(
              dataset,
              batch_size=batch_size,
              pin_memory=True,
              shuffle=False,                      # 设置了新的 sampler,参数 shuffle 要设置为 False 
              sampler=DistributedSampler(dataset) # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠
          )
      
      注意需要在各 epoch 入口调用该 sampler 对象的 set_epoch() 方法,否则每个 epoch 加载的样本顺序都不变
    5. 运行过程中单独控制某个进程进行某些操作,比如要想保存 ckpt,由于每张卡里都有完整的模型参数,所以只需要控制一个进程保存即可。需要注意的是:使用 DDP 改写的代码会在每个 GPU 上各自运行,因此需要在程序中获取当前 GPU 的 rank(gpu_id),这样才能对针对性地控制各个 GPU 的行为
      if self.gpu_id == 0 and epoch % self.save_every == 0:
      	self._save_checkpoint(epoch)
      
    6. 使用 torch.multiprocessing.spawn 方法将代码分发到各个 GPU 的进程中执行
      # 利用 mp.spawn,在整个 distribution group 的 nprocs 个 GPU 上生成进程来执行 fn 方法,并能设置要传入 fn 的参数 args
      # 注意不需要传入 fn 的 rank 参数,它由 mp.spawn 自动分配
      world_size = torch.cuda.device_count()
      mp.spawn(
          fn=main, 
          args=(world_size, args.save_every, args.total_epochs, args.batch_size), 
          nprocs=world_size
      )
      
  • 完整的修改版代码如下,请参考注释自行对比

    # 使用 DistributedDataParallel 进行单机多卡训练
    import torch
    import torch.nn.functional as F
    from torch.utils.data import Dataset, DataLoader
    import os
    
    # 对 python 多进程的一个 pytorch 包装
    import torch.multiprocessing as mp
    
    # 这个 sampler 可以把采样的数据分散到各个 CPU 上                                      
    from torch.utils.data.distributed import DistributedSampler     
    
    # 实现分布式数据并行的核心类        
    from torch.nn.parallel import DistributedDataParallel as DDP         
    
    # DDP 在每个 GPU 上运行一个进程,其中都有一套完全相同的 Trainer 副本(包括model和optimizer)
    # 各个进程之间通过一个进程池进行通信,这两个方法来初始化和销毁进程池
    from torch.distributed import init_process_group, destroy_process_group 
    
    
    def ddp_setup(rank, world_size):
        """
        setup the distribution process group
    
        Args:
            rank: Unique identifier of each process
            world_size: Total number of processes
        """
        # MASTER Node(运行 rank0 进程,多机多卡时的主机)用来协调各个 Node 的所有进程之间的通信
        os.environ["MASTER_ADDR"] = "localhost" # 由于这里是单机实验所以直接写 localhost
        os.environ["MASTER_PORT"] = "12355"     # 任意空闲端口
        init_process_group(
            backend="nccl",                     # Nvidia CUDA CPU 用这个 "nccl"
            rank=rank,                          
            world_size=world_size
        )
        torch.cuda.set_device(rank)
    
    class Trainer:
        def __init__(
            self,
            model: torch.nn.Module,
            train_data: DataLoader,
            optimizer: torch.optim.Optimizer,
            gpu_id: int,
            save_every: int,
        ) -> None:
            self.gpu_id = gpu_id
            self.model = model.to(gpu_id)
            self.train_data = train_data
            self.optimizer = optimizer
            self.save_every = save_every                    # 指定保存 ckpt 的周期
            self.model = DDP(model, device_ids=[gpu_id])    # model 要用 DDP 包装一下
    
        def _run_batch(self, source, targets):
            self.optimizer.zero_grad()
            output = self.model(source)
            loss = F.cross_entropy(output, targets)
            loss.backward()
            self.optimizer.step()
    
        def _run_epoch(self, epoch):
            b_sz = len(next(iter(self.train_data))[0])
            print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
            self.train_data.sampler.set_epoch(epoch)        # 在各个 epoch 入口调用 DistributedSampler 的 set_epoch 方法是很重要的,这样才能打乱每个 epoch 的样本顺序
            for source, targets in self.train_data: 
                source = source.to(self.gpu_id)
                targets = targets.to(self.gpu_id)
                self._run_batch(source, targets)
    
        def _save_checkpoint(self, epoch):
            ckp = self.model.module.state_dict()            # 由于多了一层 DDP 包装,通过 .module 获取原始参数 
            PATH = "checkpoint.pt"
            torch.save(ckp, PATH)
            print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")
    
        def train(self, max_epochs: int):
            for epoch in range(max_epochs):
                self._run_epoch(epoch)
                # 各个 GPU 上都在跑一样的训练进程,这里指定 rank0 进程保存 ckpt 以免重复保存
                if self.gpu_id == 0 and epoch % self.save_every == 0:
                    self._save_checkpoint(epoch)
    
    class MyTrainDataset(Dataset):
        def __init__(self, size):
            self.size = size
            self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]
    
        def __len__(self):
            return self.size
        
        def __getitem__(self, index):
            return self.data[index]
    
    def load_train_objs():
        train_set = MyTrainDataset(2048)  # load your dataset
        model = torch.nn.Linear(20, 1)  # load your model
        optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
        return train_set, model, optimizer
    
    
    def prepare_dataloader(dataset: Dataset, batch_size: int):
        return DataLoader(
            dataset,
            batch_size=batch_size,
            pin_memory=True,
            shuffle=False,                      # 设置了新的 sampler,参数 shuffle 要设置为 False 
            sampler=DistributedSampler(dataset) # 这个 sampler 自动将数据分块后送个各个 GPU,它能避免数据重叠
        )
    
    
    def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
        # 初始化进程池
        ddp_setup(rank, world_size)
    
        # 进行训练
        dataset, model, optimizer = load_train_objs()
        train_data = prepare_dataloader(dataset, batch_size)
        trainer = Trainer(model, train_data, optimizer, rank, save_every)
        trainer.train(total_epochs)
       
        # 销毁进程池
        destroy_process_group()
    
    
    if __name__ == "__main__":
        import argparse
        parser = argparse.ArgumentParser(description='simple distributed training job')
        parser.add_argument('--total-epochs', type=int, default=50, help='Total epochs to train the model')
        parser.add_argument('--save-every', type=int, default=10, help='How often to save a snapshot')
        parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
        args = parser.parse_args()
        
        world_size = torch.cuda.device_count()
        
        # 利用 mp.spawn,在整个 distribution group 的 nprocs 个 GPU 上生成进程来执行 fn 方法,并能设置要传入 fn 的参数 args
        # 注意不需要 fn 的 rank 参数,它由 mp.spawn 自动分配
        mp.spawn(
            fn=main, 
            args=(world_size, args.save_every, args.total_epochs, args.batch_size), 
            nprocs=world_size
        )
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1001258.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java开发分布式抽奖系统

Lottery 基于Springboot,Dubbo 等开发的分布式抽奖系统 1. 环境 配置 规范 2. 搭建(DDD RPC)架构 DDD(Domain-Driven Design 领域驱动设计)是由Eric Evans最先提出,目的是对软件所涉及到的领域进行建模,以应对系统…

阿里云轻量应用服务器为什么便宜?CPU性能差吗?

阿里云轻量应用服务器2核2G3M带宽优惠价108元一年,轻量应用服务器为什么便宜?是因为性能差吗?并不是,轻量应用服务器不限制CPU基准性能,轻量有月流量限制,轻量不支持指定CPU处理器,阿里云轻量2核…

研发效能行业发展趋势及人才认证的重要性丨IDCF

在当今的高科技环境下,“研发效能”已成为企业竞争和发展的关键因素。近年来,随着技术的快速发展,研发效能行业也在不断演进,并且将在未来几年内持续发展。本文将探讨研发效能行业的发展趋势以及人才认证的重要性。 一、研发效能…

便捷查询中通快递,详细物流信息轻松获取

在如今快节奏的生活中,快递已成为人们生活中不可或缺的一部分。然而,快递查询却常常让人头疼,因为需要分别在不同的快递公司官网上进行查询,耗费时间和精力。为了解决这个问题,固乔科技推出了一款便捷的快递查询助手&a…

Deepface使用教程

一:github地址 GitHub - iperov/DeepFaceLab: DeepFaceLab is the leading software for creating deepfakes. GitHub - iperov/DeepFaceLive: Real-time face swap for PC streaming or video calls DeepFaceLab为视频处理。 DeepFaceLabLive为直播版本。 此处C…

vue3中css使用script中定义的变量

代码 <template><div class"box">haha</div> </template><script setup lang"ts"> const boxWidth 500px </script><style lang"scss"> .box {width: v-bind(boxWidth);height: 200px;background-c…

Allegro166版本如何在颜色管理器中实时显示层面操作指导

Allegro166版本如何在颜色管理器中实时显示层面操作指导 在用Allegro166进行PCB设计的时候,需要在颜色管理器中频繁的开关层面。但是166不像172一样在颜色管理器中可以实时的开关层面,如下图 需要打开Board Geometry/Soldermask_top层,首先需要勾选这个层面,再点击Apply即…

2023年车载超声波雷达行业研究报告

第一章 行业概况 车载超声波雷达&#xff0c;通常在英文中被称为“Automotive Ultrasonic Radar”或“Automotive Ultrasonic Sensor”&#xff0c;是一种使用超声波来检测车辆周围物体的距离的传感器。这个行业主要关注的是为汽车提供停车辅助、防撞和其他安全功能。 图 超声…

GDB调试方法汇总

gcc常用选项 选项含义描述-o filename指定输出文件名&#xff0c;在编译目标代码时&#xff0c;可不选&#xff0c;不指定filename时&#xff0c;默认文件名是a.out-c只编译不链接&#xff0c;生成目标文件.o-S只编译不汇编&#xff0c;生成汇编代码-E只进行预编译&#xff0c…

MyBatis:The error occurred while setting parameters;foreach语句不生效

根本原因就是在参数上&#xff0c;列举一下可能的原因&#xff1a; 1.sql语句中的传的参数类型和数据库中不一致 2.#{}写成${} 3.也有说是在sql语句后加了“&#xff1b;”有影响的 本人sql语句如下&#xff1a; 该条语句的参数是list&#xff0c;list中存着试卷对象&#xff…

雅思口语回答拿不准,chatgpt担任考官帮你润色文本

目录 你现在是一位雅思口语考官&#xff0c;请你现在以雅思口语的评价标准&#xff0c;来对于我的回答进行打分 问题是“Do you like wearing T-shirts?”&#xff0c;我即将在下一句话中进行回答&#xff0c;请你进行评价&#xff0c;请等待我的回答&#xff0c;请等待我的回…

【Linux】编辑器 vim

1、vim的基本概念 vi/vim【一款文本编辑器】vim【一款多模式编辑器】vi/vim 的区别简单点来说&#xff0c;它们都是多模式编辑器&#xff0c;不同的是 vim 是 vi 的升级版本&#xff0c;它不仅兼容vi的所有指令&#xff0c;而且还有一些新的特性在里面。例如语法加亮&#xff0…

Android高德地图截屏功能(可包含自定义控件)

一、不包含自定义控件 地图 SDK 支持对当前屏幕显示区域进行截屏&#xff0c;可以对地图、覆盖物&#xff08;包含信息窗口&#xff09;、Logo进行截取屏幕&#xff0c;这其中不包括地图控件、Toast窗口。 详细示例如下&#xff1a; // 对地图进行截屏aMap!!.getMapScreenSho…

ajax day3

3、将普通对象转为查询参数字符串形式&#xff1a; 创建URLSearchParams参数&#xff0c;再用toString方法转为字符串 4、xhr对象 请求参数&#xff1a;body参数 5、promise promise对象一旦被兑现或拒绝&#xff0c;就是已敲定了&#xff0c;状态无法再被改变。故此处先执行…

基于Python和mysql开发的智慧校园答题考试系统(源码+数据库+程序配置说明书+程序使用说明书)

一、项目简介 本项目是一套基于Python和mysql开发的智慧校园答题考试系统&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Python学习者。 包含&#xff1a;项目源码、项目文档、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都…

第十章 数组和指针

本章介绍以下内容&#xff1a; 关键字&#xff1a;static 运算符&#xff1a;&、*&#xff08;一元&#xff09; 如何创建并初始化数组 指针&#xff08;在已学过的基础上&#xff09;、指针和数组的关系 编写处理数组的函数 二维数组 人们通常借助计算机完成统计每月的支出…

konisGraph学习。复杂查询优化记录

最近有需求是查两个公司之间的投资关系 比如 a和b之间有哪些直接投资和间接投资。 例如 a->b a->e->b a->c->d->b b->f->a 需求是查出7跳以内的ab之间的投资关系 v的标签是company_name,属性是company_name ,eid 其中ideid e的标签是inv…

恒林家居引入纷享销客CRM系统,领跑家居行业营销数字化进程

近日&#xff0c;恒林家居股份有限公司&#xff08;&#xff08;股票代码&#xff1a;603661以下简称为“恒林家居”&#xff09;携手纷享销客在湖州召开了CRM项目启动会。双方领导及核心项目人员齐聚一堂&#xff0c;展开了深度交流并达成了重要共识。 作为家居行业的领军企业…

查看创建好的数据库

MySQL从小白到总裁完整教程目录:https://blog.csdn.net/weixin_67859959/article/details/129334507?spm1001.2014.3001.5502 语法格式: show create database 数据库名称; 案列:查看testing数据库信息 mysql> show create database testing; ------------------------…

源代码防泄密和工控安全方案简介

客户情况 某新能源电池企业专业从事于新能源锂离子动力电池和储能电池的研发、生产和销售&#xff0c;具备电芯、模组、BMS及Pack的完整资源开发能力。公司致力于通过持续不断地改进电池技术&#xff0c;为全球锂离子动力和储能领域提供数字化精准高效的新能源解决方案。 该企…