第十章 番外篇:DDP

news2025/1/22 19:38:52

参考教程:
what is DDP
pytorch distributed overview

文章目录

  • DDP介绍
    • 什么是DDP
    • DistributedSampler()
    • DistributedDataParallel()
  • 使用DDP
    • 代码示例
    • multiprocessing.spawn()
    • save and load checkpoints

DDP介绍

什么是DDP

DDP的全称是DistributedDataParallel,它允许使用pytorch使用数据并行训练。
在非分布式训练中,你的模型被放在一个gpu上,你的模型接收来自一个dataloader的inputbatch,它对输入进行前向传播并且计算损失,然后进行反向传播并计算参数的梯度。优化器在这个过程中对模型的参数进行更新。
当我们现在多个gpu上进行训练时,DDP会在每个GPU上都启动一个进程,每个进程中都有一个模型的复制品replica(local copy),每一个模型和优化器的复制品都是相同的。模型的参数初始化相同,优化器也使用一样的随机种子。DDP在训练过程中维持了这种一致性。
现在,每个gpu的模型都是一样的。我们仍然从dataloader中获取inputbatch,但是这个时候我们使用的是DistributedSampler。这个sampler保证了每个模型接收到的inputbatch是不同的,这就是“data parallel” in DDP。
在每个进程中,模型接收到一个不同的输入,并且locally的进行前向传播和后向传播。因为输入是不同的,所以梯度的累积也是不同的。使用optimizer.step()会产生不同的结果。比如说我们现在有4个gpu,在4个gpu上有4个模型的local copy,那么这四个模型就是不同的模型,而不是我们想要的一个分布式的模型。
DDP中的同步机制,会把来自四个replicas的梯度累加起来,其中使用了一个算法,在replicas间进行通信,它不会等待所有的梯度都计算完成,而是在计算过程中就保持着通信,这保证了你的gpu总是在工作的。
在这里插入图片描述
综合来说,就是在n个gpu上的训练相当于同时进行n个模型的训练,这些模型是完全一样的,实时同步的,只不过传入的数据不一样。

DistributedSampler()

在之前介绍数据集构建的时候曾经提到过sampler的概念。假如dataloader中的shuffle = True,你使用的就是一个randomsampler;假如dataloader中的shuffle = False,你使用的就是一个sequentialsampler。

torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=False)

DistributedSampler传入的第一个参数是dataset,它要求这个Dataset有一个constant的大小。iterable dataset也可以使用DDP,参考https://github.com/Lightning-AI/lightning/issues/15734。我先不看了,用的时候再说。
第二个参数是num_replicas,你想要复制的个数。第三个参数是rank,是当前的process的index。shuffle和seed主要是控制你的dataset是否要shuffle和随机种子。

DistributedSampler()一般和torch.nn.parallel.DistributedDataParallel组合使用。在这种情况下,每一个进程都会传递一个DistributedSampler实例,并且加载原始数据中彼此完全不同的子集。

在构造函数初始化阶段,会计算每个replica中分配到的samples数。

if self.drop_last and len(self.dataset) % self.num_replicas != 0:  # type: ignore[arg-type]
            # Split to nearest available length that is evenly divisible.
            # This is to ensure each rank receives the same amount of data when
            # using this Sampler.
            self.num_samples = math.ceil(
                (len(self.dataset) - self.num_replicas) / self.num_replicas  # type: ignore[arg-type]
            )
        else:
            self.num_samples = math.ceil(len(self.dataset) / self.num_replicas)  # type: ignore[arg-type]

最终获得的dataset中数据的总数就是

self.total_size = self.num_samples * self.num_replicas

在取数据的时候,会把不同的数据分配给不同的replicas。

indices = indices[self.rank:self.total_size:self.num_replicas]

在使用DistributedSampler时,在每个epoch都需要调用set_epoch(epoch)函数,这样每个epoch里数据都会按照不同的顺序获取,如果不调用的话,获取的顺序将始终保持一致。因为它随机种子的生成也和当前的epoch有关系。

g.manual_seed(self.seed + self.epoch)

下面给出了一个使用的例子。

sampler = DistributedSampler(dataset) if is_distributed else None
loader = DataLoader(dataset, shuffle=(sampler is None),
                    sampler=sampler)
for epoch in range(start_epoch, n_epochs):
    if is_distributed:
        sampler.set_epoch(epoch)
    train(loader)

DistributedDataParallel()

torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False, static_graph=False)

DDP为了保证不同gpu上的模型能完成同步,需要进行较为繁琐的设置。
使用这个类要求你先对torch.distributed进行初始化,通过调用方法。

torch.distributed.init_process_group(backend=None, init_method=None, timeout=datetime.timedelta(seconds=1800), world_size=- 1, rank=- 1, store=None, group_name='', pg_options=None)

在DistributedDataParallel()中,第一个参数module是你想要并行话的module,在训练中也就是你的模型。
参数device_id和output_device代表你的moudle的CUDAdevice和你的output的CUDAdevice。
process_group如果为None,会默认使用你的torch.distributed.init_process_group。

给出一个简单的示例

torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
net = torch.nn.parallel.DistributedDataParallel(model)

为了让你的DDP能够在N个GPU上使用,你需要产生N个进程,并且保证每个进程独立的工作在一个GPU上。有两个方法可以完成这个设置。
第一个方法是:

torch.cuda.set_device(i)

在每个进程中,你需要进行以下的设置。

torch.distributed.init_process_group(
    backend='nccl', world_size=N, init_method='...'
)
model = DistributedDataParallel(model, device_ids=[i], output_device=i)

使用DDP

建议直接参考https://github.com/pytorch/examples/blob/main/distributed/ddp-tutorial-series/multigpu.py。给出了一个完整的模版,可以直接套用。

代码示例

教程中给出了一个比较简单但完整的DDP示例。
在这个例子中,init_process_group被封装到一个setup函数里。

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

然后定义一个作为例子的小模型。使用torch.multiprocess产生多个进程,rank表示当前的进程数,或者说CUDA id。world_size代表了gpu的个数。在每个进程中,将你的model放到对应的device上。
这个例子中没有加入dataset的部分。只有DDP中模型的使用。

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

multiprocessing.spawn()

torch.multiprocessing.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')

torch.multiprocessing基本上就是在python的multiprocessing上进行了一些改动。具体的不讲了【因为我也没有看】,我们只说一下spawn。

spawn的输入包括 fn:目标函数, args:函数的参数,nprocs:需要的进程数。

在使用这个方法后,multiprocessing会以‘spawn’的方法来创建新的进程。创建的数量由nprocs决定。

for i in range(nprocs):
        error_queue = mp.SimpleQueue()
        process = mp.Process(
            target=_wrap,
            args=(fn, i, args, error_queue),
            daemon=daemon,
        )
        process.start()
        error_queues.append(error_queue)
        processes.append(process)

可以看到在下方代码的mp.Process中使用的target是一个名为_wrap的函数。这个函数把当前process对应的index和args组合在一起,作为输入传入到fn中去。所以理论上来讲,你定义的fn的传入参数数量至少为1个,也就是当前进程的index。而在使用torch中的mp.spawn()时,args里面你不能把这个参数写出来,而只需要写第二个以及以后的参数。

def _wrap(fn, i, args, error_queue):
    # prctl(2) is a Linux specific system call.
    # On other systems the following function call has no effect.
    # This is set to ensure that non-daemonic child processes can
    # terminate if their parent terminates before they do.
    _prctl_pr_set_pdeathsig(signal.SIGINT)

    try:
        fn(i, *args)  # index和args一起作为args传给了fn
    except KeyboardInterrupt:
        pass  # SIGINT; Killed by parent, do nothing
    except Exception:
        # Propagate exception to parent process, keeping original traceback
        import traceback
        error_queue.put(traceback.format_exc())
        sys.exit(1)

save and load checkpoints

模型的保存和加载方法和之前的使用上区别不大。只是加了一点限制。
在下方的示例代码中,在DDP中保存state_dict()时用的是model.module.state_dict()。确认了一下发现在ddp(model)的示例化时,model会被赋值给self.module。
但是有的例子中并没有使用module.state_dict()来完成保存。
这里为了防止重复保存model,限制了只在gpu_id == 0 时才进行模型的保存。

- ckp = self.model.state_dict()
+ ckp = self.model.module.state_dict()
...
...
- if epoch % self.save_every == 0:
+ if self.gpu_id == 0 and epoch % self.save_every == 0:
   self._save_checkpoint(epoch)

一个优化方法是将模型的每次更新都保存在一个进程上然后被加载到所有进程中,这样可以防止多次写入。因为所有的进程都是从一样的参数开始的,并且bp中的梯度计算总是同步的,因此不同gpu上的optimizer也要保持同步。
** make sure no process starts loading before the saving is finished。**
在下面的例子中,在rank == 0的情况下才会进行model的保存。使用dist.barrier()限制了save期间的别的process对模型的加载。

def demo_checkpoint(rank, world_size):
    print(f"Running DDP checkpoint example on rank {rank}.")
    setup(rank, world_size)

    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])


    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # Use a barrier() to make sure that process 1 loads the model after process
    # 0 saves it.
    dist.barrier()
    # configure map_location properly
    map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location))

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)

    loss_fn(outputs, labels).backward()
    optimizer.step()

    # Not necessary to use a dist.barrier() to guard the file deletion below
    # as the AllReduce ops in the backward pass of DDP already served as
    # a synchronization.

    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/670829.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Qt控件学习

目录 QPushButton QToolButton QRadioButton QCheckBox QPushButton MainWindow::MainWindow(QWidget *parent): QMainWindow(parent), ui(new Ui::MainWindow) {ui->setupUi(this);ui->n1->setText("我的世界");ui->n1->setIcon(QIcon(":/111…

异构广告混排在美团到店业务的探索与实践

转子:https://tech.meituan.com/2022/03/10/exploration-and-practice-of-heterogeneous-ad-mixed-ranking-in-meituan-ads.html 1 背景与简介 1.1 背景 美团到店广告负责美团搜索流量的商业变现,服务于到店餐饮、休娱亲子、丽人医美、酒店旅游等众多…

文言一心,ChatGLM-6B和ChatGPT等模型概述

原文首发于博客文章大语言模型概况 定义 (个人理解的)大语言模型(Large Language Model)是一种基于深度学习技术的自然语言处理通用模型,它可以通过学习大规模文本数据的模式和规律,从而实现对自然语言的理…

【面试】你知道数据库能抗多大并发压力吗?

文章目录 前言一、一般业务系统运行流程图二、一台4核8G的机器能扛多少并发量呢?三、高并发来袭时数据库会先被打死吗?四、数据库架构可以从哪些方面优化?4.1、根据业务系统拆分多个数据库机器优化方案4.2、读写分离架构优化方案4.3、分库分表…

华为手表上架(1)HarmonyOS应用 打包 .app

华为手表上架(1)HarmonyOS应用 打包 .app 初环境与设备先找到配置项目结构的地方: Project Structure配置证书打包 .app 在本文中,我们介绍探讨华为手表上架的流程,并重点介绍HarmonyOS应用的打包过程。了解如何将应用…

locust学习教程(5) - 分布式执行

目录 前言 1、基础 2、操作步骤 2.1、启动主节点 2.2、启动从节点 2.2.1、启动自己电脑的从节点 2.2.2、启动同事电脑的从节点 2.3、开始并发 3、无web界面,定时运行,数据存储在csv中、等待4个节点连接后自动开始 4、有web界面,定时运…

5年测试老鸟整理,企业自动化测试经验,不要再走弯路了...

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 2023年&#xff0…

vue-antdesign——实现table单元格合并+换行展示+滚动到指定位置+行样式设置——技能提升

最近在写后台管理系统,遇到一个需求,就是要实现表格table组件的合并单元格,并实现编辑等功能。 效果图如下: 下面做一下记录: 1.表格table组件实现单元格合并 之前写过一篇文章关于单元格合并的功能,这…

倒谱和同态去卷积

本文首发于算法社区,转载请注明出处,谢谢。 前言 倒谱的内容比较少,比较相关的就是同态系统去卷积了,所以把这两个放在一起了。 倒谱 我们考虑一个具有 z z z 变换的序列 { x n } \left \{ x_{n} \right \} {xn​} 。我们假设…

Elasticsearch(十一)搜索---搜索匹配功能②--range查询和exists查询

一、前言 继上一节学习了ES的搜索的查询全部和term搜索后,此节将把搜索匹配功能剩余的2个学习完,分别是range搜索和exists搜索 二、range范围搜索 range查询用于范围查询,一般是对数值型和日期型数据的查询。使用range进行范围查询时&…

创建多线程的四种方式

目录儿 一、创建线程的四种方式1. 继承Thread类2. 实现Runnable接口3. 实现Callable接口4. 线程池禁止使用 Executors 构建线程池构建线程池的正确方式 一、创建线程的四种方式 1. 继承Thread类 ① 创建一个类继承Thread类,重写run()方法 ② 调用start()方法启动线…

AI卷入618战场;印象AI开放次数限制;2023 AIGC人才趋势洞察报告;员工瞒着老板悄悄用AI;超好用的AI头像生成教程 | ShowMeAI日报

👀日报&周刊合集 | 🎡生产力工具与行业应用大全 | 🧡 点赞关注评论拜托啦! 🤖 澳洲游戏媒体 Gamurs 招聘AI编辑,被各路媒体口诛笔伐 上周,澳洲知名游戏媒体集团 Gamurs 在官网招聘「AI Edit…

负载测试和压力测试有何区别?资深测试老鸟总结,一篇搞定...

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 负载与压力测试 …

js中数组对象去重的几种方式

js中数组对象去重的几种方式 1、方法一:双层for循环2、对象访问属性的方法3、Map()方法4、reduce方法 首先我们定义数组的形式 let arrObj [{ name: "张三", key: 1 },{ name: "李四", key: 2 },{ name: "王五", key: 3 },{ name: &…

基于langchain+chatGLM搭建部署本地私有化知识库系统

前言 一、 自主GPT 所谓自主(autonomous)GPT是设计一个Agent,让它自己做计划、决策和动作,通过源源不断的迭代,去完成设定的目标。比如 AutoGPT 。 AutoGPT把GPT的能力推向了更高的应用层次。设定一个任务&#xff…

mac下qtcreator代码格式化

代码开发:qtcreator qtcreator 插件:Beautifier 格式化工具:clang-format 1、Beautifier插件安装 将复现框勾选后重启qtcreator即可。 2、安装clang-format工具 (1)打开终端输入下面命令等待安装完成 brew install…

Python基础(19)——Python函数讲解一

Python基础(19)——Python函数讲解一 文章目录 Python基础(19)——Python函数讲解一目标一. 函数的作用二. 函数的使用步骤2.1 定义函数2.2 调用函数2.3 快速体验 三.函数的参数作用四.函数的返回值作用4.1 应用 五.函数的说明文档…

大快人心,华为EDA领域的新突破,关联软件已取得全面适配

EDA被称为“芯片之母”,是集成电路、电子信息,甚至是全球数字经济的赋能者,是许多电子产业链的基石。 一直以来,就被国际的三大巨头占领:美国Synopsys、美国Cadence、德国Mentor Graphics,市场份额高达90%。…

locust学习教程(6)- 使用更快的http客户端:FastHttpUser

目录 1、概念 2、估算电脑允许的最大并发数 3、fasthttpuser的使用 🎁更多干货 完整版文档下载方式: 1、概念 Locust 的默认 HTTP 客户端使用的是 python-requests 库。如果我们需要运行非常高的吞吐量测试,去判断吞吐量是否达到预期值&…

免费文字转语音软件哪个好?推荐这三款文字转语音软件给你

文字转语音软件可以将我们输入的文字内容转化为人声朗读出来,这在很多场合都非常实用。比如,在开车或者做家务时,无法手持手机进行阅读,但是通过文字转语音功能,就可以轻松地听取所需内容。然而,市面上的文…