【PyTorch教程】如何使用PyTorch分布式并行模块DistributedDataParallel(DDP)进行多卡训练

news2024/12/31 6:07:29

本期目录

  • 1. 导入核心库
  • 2. 初始化分布式进程组
  • 3. 包装模型
  • 4. 分发输入数据
  • 5. 保存模型参数
  • 6. 运行分布式训练
  • 7. DDP完整训练代码


  • 本章的重点是学习如何使用 PyTorch 中的 Distributed Data Parallel (DDP) 库进行高效的分布式并行训练。以提高模型的训练速度。

1. 导入核心库

  • DDP 多卡训练需要导入的库有:

    作用
    torch.multiprocessing as mp原生Python多进程库的封装器
    from torch.utils.data.distributed import DistributedSampler上节所说的DistributedSampler,划分不同的输入数据到GPU
    from torch.nn.parallel import DistributedDataParallel as DDP主角,核心,DDP 模块
    from torch.distributed import init_process_group, destroy_process_group两个函数,前一个初始化分布式进程组,后一个销毁分布式进程组

2. 初始化分布式进程组

  • Distributed Process Group 分布式进程组。它包含在所有 GPUs 上的所有的进程。因为 DDP 是基于多进程 (multi-process) 进行并行计算,每个 GPU 对应一个进程,所以必须先创建并定义进程组,以便进程之间可以互相发现并相互通信。

  • 首先来写一个函数 ddp_setup()

    import torch
    import os
    from torch.utils.data import Dataset, DataLoader
    
    # 以下是分布式DDP需要导入的核心库
    import torch.multiprocessing as mp
    from torch.utils.data.distributed import DistributedSampler
    from torch.nn.parallel import DistributedDataParallel as DDP
    from torch.distributed import init_process_group, destroy_process_group
    
    
    # 初始化DDP的进程组
    def ddp_setup(rank, world_size):
        os.environ["MASTER_ADDR"] = "localhost"
        os.environ["MASTER_PORT"] = "12355"
        init_process_group(backend="nccl", rank=rank, world_size=world_size)
    
  • 其包含两个入参:

    入参含义
    rank进程组中每个进程的唯一 ID,范围是[0, world_size-1]
    world_size一个进程组中的进程总数
  • 在函数中,我们首先来设置环境变量:

    环境变量含义
    MASTER_ADDR在rank 0进程上运行的主机的IP地址。单机训练直接写 “localhost” 即可
    MASTER_PORT主机的空闲端口,不与系统端口冲突即可

    之所以称其为主机,是因为它负责协调所有进程之间的通信。

  • 最后,我们调用 init_process_group() 函数来初始化默认分布式进程组。其包含的入参如下:

    入参含义
    backend后端,通常是 nccl ,NCCL 是Nvidia Collective Communications Library,即英伟达集体通信库,用于 CUDA GPUs 之间的分布式通信
    rank进程组中每个进程的唯一ID,范围是[0, world_size-1]
    world_size一个进程组中的进程总数
  • 这样,进程组的初始化函数就准备好了。

【注意】

  • 如果你的神经网络模型中包含 BatchNorm 层,则需要将其修改为 SyncBatchNorm 层,以便在多个模型副本中同步 BatchNorm 层的运行状态。(你可以调用 torch.nn.SyncBatchNorm.convert_sync_batchnorm(model: torch.nn.Module) 函数来一键把神经网络中的所有 BatchNorm 层转换成 SyncBatchNorm 层。)

3. 包装模型

  • 训练器的写法有一处需要注意,在开始使用模型之前,我们需要使用 DDP 去包装我们的模型:

    self.model = DDP(self.model, device_ids=[gpu_id])
    
  • 入参除了 model 以外,还需要传入 device_ids: List[int] or torch.device ,它通常是由 model 所在的主机的 GPU ID 所组成的列表,


4. 分发输入数据

  • DistributedSampler 在所有分布式进程中对输入数据进行分块,确保输入数据不会出现重叠样本。

  • 每个进程将接收到指定 batch_size 大小的输入数据。例如,当你指定了 batch_size 为 32 时,且你有 4 张 GPU ,那么有效的 batch size 为:
    32 × 4 = 128 32 \times 4 = 128 32×4=128

    train_loader = torch.utils.data.DataLoader(
    	dataset=train_set,
        batch_size=32,
        shuffle=False,	# 必须关闭洗牌
        sampler=DistributedSampler(train_set)	# 指定分布式采样器
    )
    
  • 然后,在每轮 epoch 的一开始就调用 DistributedSamplerset_epoch(epoch: int) 方法,这样可以在多个 epochs 中正常启用 shuffle 机制,从而避免每个 epoch 中都使用相同的样本顺序。

    def _run_epoch(self, epoch: int):
        b_sz = len(next(iter(self.train_loader))[0])
        self.train_loader.sampler.set_epoch(epoch)	# 调用
        for x, y in self.train_loader:
            ...
            self._run_batch(x, y)
    

5. 保存模型参数

  • 由于我们前面已经使用 DDP(model) 包装了模型,所以现在 self.model 指向的是 DDP 包装的对象而不是 model 模型对象本身。如果此时我们想读取模型底层的参数,则需要调用 model.module

  • 由于所有 GPU 进程中的神经网络模型参数都是相同的,所以我们只需从其中一个 GPU 进程那儿保存模型参数即可。

    ckp = self.model.module.state_dict()	# 注意需要添加.module
    ...
    ...
    if self.gpu_id == 0 and epoch % self.save_step == 0:	# 从gpu:0进程处保存1份模型参数
        self._save_checkpoint(epoch)
    

6. 运行分布式训练

  • 包含 2 个新的入参 rank (代替 device) 和 world_size

  • 当调用 mp.spawn 时,rank 参数会被自动分配。

  • world_size 是整个训练过程中的进程数量。对 GPU 训练来说,指的是可使用的 GPU 数量,且每张 GPU 都只运行 1 个进程。

    def main(rank: int, world_size: int, total_epochs: int, save_step: int):
        ddp_setup(rank, world_size)	# 初始化分布式进程组
        train_set, model, optimizer = load_train_objs()
        train_loader = prepare_dataloader(train_set, batch_size=32)
        trainer = Trainer(
            model=model,
            train_loader=train_loader,
            optimizer=optimizer,
            gpu_id=rank,	# 这里变了
            save_step=save_step
        )
        trainer.train(total_epochs)
        destroy_process_group()	# 最后销毁进程组
        
    if __name__ == "__main__":
        import sys
        total_epochs = int(sys.argv[1])
        save_step = int(sys.argv[2])
        world_size = torch.cuda.device_count()
        mp.spawn(main, args=(world_size, total_epochs, save_step), nprocs=world_size)
    
  • 这里调用了 torch.multiprocessingspawn() 函数。该函数的主要作用是在多个进程中执行指定的函数,每个进程都在一个独立的 Python 解释器中运行。这样可以避免由于 Python 全局解释器锁 (GIL) 的存在而限制多线程并发性能的问题。在分布式训练中,通常每个 GPU 或计算节点都会运行一个独立的进程,通过进程之间的通信实现模型参数的同步梯度聚合

  • 可以看到调用 spawn() 函数时,传递 args 参数时并没有传递 rank ,这是因为会自动分配,详见下方表格 fn 入参介绍。

    入参含义
    fn: function每个进程中要执行的函数。该函数会以 fn(i, *args) 的形式被调用,其中 i 是由系统自动分配的唯一进程 ID ,args 是传递给该函数的参数元组
    args: tuple要传递给函数 fn 的参数
    nprocs: int要启动的进程数量
    join: bool是否等待所有进程完成后再继续执行主进程 (默认值为 True)
    daemon: bool是否将所有生成的子进程设置为守护进程 (默认为 False)

7. DDP完整训练代码

首先,创建了一个训练器 Trainer 类。

import torch
import os
from torch.utils.data import Dataset, DataLoader

# 以下是分布式DDP需要导入的核心库
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group


# 初始化DDP的进程组
def ddp_setup(rank: int, world_size: int):
    """
    Args:
    	rank: Unique identifier of each process.
    	world_size: Total number of processes.
    """
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    init_process_group(backend="nccl", rank=rank, world_size=world_size)
    
 
class Trainer:
    def __init__(
        self,
        model: torch.nn.Module,
        train_loader: DataLoader,
        optimizer: torch.optim.Optimizer,
        gpu_id: int,
        save_step: int	# 保存点(以epoch计)
    ) -> None:
        self.gpu_id = gpu_id,
        self.model = DDP(model, device_ids=[self.gpu_id])	# DDP包装模型
        self.train_loader = train_loader,
        self.optimizer = optimizer,
        self.save_step = save_step
        
	def _run_batch(self, x: torch.Tensor, y: torch.Tensor):
        self.optimizer.zero_grad()
        output = self.model(x)
        loss = torch.nn.CrossEntropyLoss()(output, y)
        loss.backward()
        self.optimizer.step()
        
	def _run_epoch(self, epoch: int):
    b_sz = len(next(iter(self.train_loader))[0])
    self.train_loader.sampler.set_epoch(epoch)	# 调用set_epoch(epoch)洗牌
    print(f'[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_loader)}')
    for x, y in self.train_loader:
        x = x.to(self.gpu_id)
        y = y.to(self.gpu_id)
        self._run_batch(x, y)
        
	def _save_checkpoint(self, epoch: int):
        ckp = self.model.module.state_dict()
        torch.save(ckp, './checkpoint.pth')
        print(f'Epoch {epoch} | Training checkpoint saved at ./checkpoint.pth')
        
	def train(self, max_epochs: int):
        for epoch in range(max_epochs):
            self._run_epoch(epoch)
            if self.gpu_id == 0 and epoch % self.save_step == 0:
                self._save_checkpoint(epoch)

然后,构建自己的数据集、数据加载器、神经网络模型和优化器。

def load_train_objs():
    train_set = MyTrainDataset(2048)
    model = torch.nn.Linear(20, 1)	# load your model
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
    return train_set, model, optimizer

def prepare_dataloader(dataset: Dataset, batch_size: int):
    return DataLoader(
        dataset=dataset,
        batch_size=batch_size,
        shuffle=False,	# 必须关闭
        pin_memory=True,
     	sampler=DistributedSampler(dataset=train_set)	# 指定DistributedSampler采样器
    )

最后,定义主函数。

def main(rank: int, world_size: int, total_epochs: int, save_step: int):
    ddp_setup(rank, world_size)	# 初始化分布式进程组
    train_set, model, optimizer = load_train_objs()
    train_loader = prepare_dataloader(train_set, batch_size=32)
    trainer = Trainer(
        model=model,
        train_loader=train_loader,
        optimizer=optimizer,
        gpu_id=rank,	# 这里变了
        save_step=save_step
    )
    trainer.train(total_epochs)
    destroy_process_group()	# 最后销毁进程组
    
if __name__ == "__main__":
    import sys
    total_epochs = int(sys.argv[1])
    save_step = int(sys.argv[2])
    world_size = torch.cuda.device_count()
    mp.spawn(main, args=(world_size, total_epochs, save_step), nprocs=world_size)

至此,你就已经成功掌握了 DDP 分布式训练的核心用法了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1202829.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

双十一电视盒子哪个牌子好?测评工作室整理口碑电视盒子排名

在挑选电视盒子的时候,新手朋友们不知道从何下手,最近很多粉丝评论想要我们分享双11电视盒子推荐,于是我们根据用户的评价整理了目前口碑最好的电视盒子排名,给不懂电视盒子哪个牌子好的朋友们做个参考。 TOP 1、泰捷WEBOX WE40S电…

【python自动化】Playwright基础教程(五)事件操作②悬停输入清除精讲

【python自动化】Playwright基础教程(五)事件操作②悬停&输入&清除精讲 本章目录 文章目录 【python自动化】Playwright基础教程(五)事件操作②悬停&输入&清除精讲鼠标悬停 - hover鼠标悬停实战 输入内容 - fill输入内容实战清空内容实战 输入内容 - type模拟…

stm32超声波测距不准的解决方法(STM32 delay_us()产生1us)及stm32智能小车超声波测距代码(C语言版本)

首先要说明一下原理:使用stm32无法准确产生1us的时间,但是超声波测距一定要依赖时间,时间不准,距离一定不准,这是要肯定的,但是在不准确的情况下,要测量一个比较准确的时间,那么只能…

内网穿透的应用-如何使用CFImagehost搭建简洁易用的私人图床并公网访问

文章目录 1.前言2. CFImagehost网站搭建2.1 CFImagehost下载和安装2.2 CFImagehost网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar临时数据隧道3.2 Cpolar稳定隧道(云端设置)3.3.Cpolar稳定隧道(本地设置) 4.公网访问测…

北斗卫星为油气管道安全保障提供可靠技术支持

北斗卫星为油气管道安全保障提供可靠技术支持 随着现代社会对能源需求的不断增长,油气管道成为了能源输送的重要通道。然而,油气管道的安全风险也日益凸显。为了及时掌握油气管道的运行状态并有效地监测其安全状况,北斗卫星技术为油气管道监测…

VMware 虚拟机启动后自动重启电脑 问题的2个解决办法

我遇到的问题: vm配置好了新的虚拟机,不过点击启动虚拟机的时候电脑自动重启。无线点击启动、无限重启! 如有帮助 请点赞收藏关注我!谢谢啦 如有转载请注明出处! 我的解决办法: 1编辑虚拟机设置 硬件选项…

解决npm报错Error: error:0308010C:digital envelope routines::unsupported

解决npm报错Error: error:0308010C:digital envelope routines::unsupported。 解决办法;终端执行以下命令(windows): set NODE_OPTIONS--openssl-legacy-provider然后再执行 npm命令成功:

阿里云服务器白嫖还是选99元特价主机?

阿里云服务器0元到手攻略,咱别0元了,花点钱吧,99元行吗?不行也有0元的,一会阿腾云atengyun.com给大家细说。先说99元即可购买一台阿里云服务器,2核2G配置、3M固定带宽、40G系统盘,第二年续费还是…

印刷包装服务预约小程序的作用是什么

印刷包装厂家非常多,其主要服务为名片印刷、礼品纸袋定制、画册宣传单印刷等,这些服务对大多数企业都有很高的需求,同时具备批量、长期合作属性,同时具备跨区域合作性,所以品牌可扩展度高。 但高需求的同时&#xff0…

【机器学习】K近邻算法:原理、实例应用(红酒分类预测)

案例简介:有178个红酒样本,每一款红酒含有13项特征参数,如镁、脯氨酸含量,红酒根据这些特征参数被分成3类。要求是任意输入一组红酒的特征参数,模型需预测出该红酒属于哪一类。 1. K近邻算法介绍 1.1 算法原理 原理&a…

【GEE学习日记】GEE下载ERA5指定小时数据

1 背景 ERA5数据集提供了逐小时的气象产品,最近做实验需要用到指定日期的14点的气象数据,所以学习了一下。 我的目的:获取2003年每月5,15,25日 14点的空气温度 2 代码 var roi table.geometry(); // table是我上传…

Facebook个人主页和公共主页的区别

Facebook个人主页和公共主页是两种不同类型的页面,它们在功能、用途和管理方面上都是存在着一些明显的区别。本文小编则对他们的区别介绍一下。 首先,个人主页是供普通用户使用的,用于展示个人信息和与朋友、家人保持联系。个人主页通常包括…

curl使用

文章目录 前言一、curl use case常见参数项包括: 二、下载操作我使用第一种方式:不验证证书,果然下载下来了。而且是下载到当前的工作文件夹。C:\Users\xxx\test.zip如果自己想指定文件地址 前言 使用 curl 工具 一、curl use case 常见参数…

编程最佳外挂:批量数据分析与可视化,CodeGeeX工具箱一键完成

ChatGLM3代模型的Code Interpreter能力,本周已经在VSCode里的CodeGeeX插件产品中,以开发者工具箱的产品形态上线。 下图以VSCode插件为例:在CodeGeeX的侧边栏,和智能问答AskCodeGeeX并列出现的工具箱标签,用户登录后就…

【第2章 Node.js基础】2.4 Node.js 全局对象(二)之,process 对象

process 对象 process对象是一个全局对象,提供当前Node.js 进程信息并对其进行控制。通常用于编写本地命令行程序。 1.进程事件 process对象是EventEmitter类的实例,因此可以使用事件的方式来处理和监听process对象的各种事件。以下是一些常用的proce…

Web APIs——综合案例学生就业统计表

1、学生就业统计表 2、渲染业务 根据持久化数据渲染页面 步骤: ①:读取localstorage本地数据 如果有数据则转换为对象放到变量里面一会使用它渲染页面如果没有则用默认空数组[]为了测试效果,可以先把initData存入本地存储看效果 ②&…

图文多模态大模型综述

自去年底ChatGPT发布后,大模型技术呈井喷式发展态势,学术界和工业界几乎每天都在刷新各个方向的SOTA榜单。随着大模型技术的发展,人们逐渐意识到多模态将是大模型发展的必经之路。其中,图文多模态大模型是一种结合了图像和文本两种…

vivado时序分析-3时序分析关键概念

1、时钟相移 时钟相移对应于延迟时钟波形 , 此波形与因时钟路径内的特殊硬件所导致的参考时钟相关。在 AMD FPGA 中 , 时钟相移通常是由 MMCM 或 PLL 原语引入的 , 前提是这些原语的输出时钟属性 CLKOUT*_PHASE 为非零值。 时序分析期间…

解锁海外网红营销的潜力:关于KOC合作的7大建议

随着社交媒体的崛起,海外网红营销已成为全球各行业的主要趋势之一。传统的广告渠道逐渐被社交媒体平台和网红吸引了大量的广告投放,因此企业需要不断创新,以吸引受众并保持竞争力。其中,KOC合作是一个备受关注的策略,它…

openGauss学习笔记-121 openGauss 数据库管理-设置密态等值查询-使用JDBC操作密态数据库

文章目录 openGauss学习笔记-121 openGauss 数据库管理-设置密态等值查询-使用JDBC操作密态数据库121.1 连接密态数据库121.2 调用isValid方法刷新缓存示例121.3 执行密态等值查询相关的创建密钥语句121.4 执行密态等值查询相关的创建加密表的语句121.5 执行加密表的预编译SQL语…