PyTorch数据并行(DP/DDP)浅析

news2024/9/21 20:41:51

一直以来都是用的单机单卡训练模型,虽然很多情况下已经足够了,但总有一些情况得上分布式训练:

  • 模型大到一张卡放不下;
  • 单张卡batch size不敢设太大,训练速度慢;
  • 当你有好几张卡,不想浪费;
  • 展示一下技术

由于还没遇到过一张显卡放不下整个模型的情况,本文的分布式训练仅限数据并行。主要从数据并行的原理和一些简单的实践例子进行说明。

文章目录

    • 原理介绍
    • DataParallel
      • 小样
    • DistributedDataParallel
      • 小样
      • 一些概念
    • DDP与DP的区别
    • 参考

原理介绍

与每个step一个batch数据相比,数据并行是指每个step用更多的数据(多个batch)进行计算——即多个batch的数据并行进行前向计算。既然是并行,那么就涉及到多张卡一起计算。单卡和多卡训练过程如下图1所示,主要有三个过程:

  • 各卡分别计算损失和梯度,即图中红线部分;
  • 所以梯度整合到主device,即图中蓝线部分;
  • 主device进行参数更新,并将新模型拷贝到其他device上,即图中绿线部分。
../_images/ps.svg
左图是单GPU训练;右图是多GPU训练的一个变体:(1)计算损失和梯度,(2)所有梯度聚合在一个GPU上,(3)发生参数更新,并将参数重新广播给所有GPU

如果不使用数据并行,在显存足够的情况下,我们可以将batch_size设大,这和数据并行的区别在哪呢?如果只将batch_size设大,计算还是在一张卡上完成,速度相对来说是不如将数据均分后放在不同卡上并行计算的。当然,考虑到卡之间的通信问题,要发挥多卡并行的力量需要进行一定权衡。

torch中主要有两种数据并行方式:DP和DDP。

DataParallel

DP是较简单的一种数据并行方式,直接将模型复制到多个GPU上并行计算,每个GPU计算batch中的一部分数据,各自完成前向和反向后,将梯度汇总到主GPU上。其基本流程:

  1. 加载模型、数据至内存;
  2. 创建DP模型;
  3. DP模型的forward过程:
    1. 一个batch的数据均分到不同device上;
    2. 为每个device复制一份模型;
    3. 至此,每个device上有模型和一份数据,并行进行前向传播;
    4. 收集各个device上的输出;
  4. 每个device上的模型反向传播后,收集梯度到主device上,更新主device上的模型,将模型广播到其他device上;
  5. 3-4循环。

在DP中,只有一个主进程,主进程下有多个线程,每个线程管理一个device的训练。因此,DP中内存中只存在一份数据,各个线程间是共享这份数据的。DP和Parameter Server的方式很像。

小样

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

# 假设我们有一个简单的数据集类
class SimpleDataset(Dataset):
    def __init__(self, data, target):
        self.data = data
        self.target = target

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.target[idx]

# 假设我们有一个简单的神经网络模型
class SimpleModel(nn.Module):
    def __init__(self, input_dim):
        super(SimpleModel, self).__init__()
        self.fc = nn.Linear(input_dim, 1)

    def forward(self, x):
        return torch.sigmoid(self.fc(x))

# 假设我们有一些数据
n_sample = 100
n_dim = 10
batch_size = 10
X = torch.randn(n_sample, n_dim)
Y = torch.randint(0, 2, (n_sample, )).float()

dataset = SimpleDataset(X, Y)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# ===== 注意:刚创建的模型是在 cpu 上的 ===== #
device_ids = [0, 1, 2]
model = SimpleModel(n_dim).to(device_ids[0])
model = nn.DataParallel(model, device_ids=device_ids)


optimizer = optim.SGD(model.parameters(), lr=0.01)

for epoch in range(10):
    for batch_idx, (inputs, targets) in enumerate(data_loader):
        inputs, targets = inputs.to('cuda'), targets.to('cuda')
        outputs = model(inputs)
        
        loss = nn.BCELoss()(outputs, targets.unsqueeze(1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item()}')

其中最重要的一行便是:

model = nn.DataParallel(model, device_ids=device_ids)

注意,模型的参数和缓冲区都要放在device_ids[0]上。在执行forward函数时,模型会被复制到各个GPU上,对模型的属性进行更新并不会产生效果,因为前向完后各个卡上的模型就被销毁了。只有在device_ids[0]上对模型的参数或者buffer进行的更新才会生效!2

DistributedDataParallel

DDP,顾名思义,即分布式的数据并行,每个进程独立进行训练,每个进程会加载完整的数据,但是读取不重叠的数据。DDP执行流程3

  • 准备阶段

    • 环境初始化
      • 在各张卡上初始化进程并建立进程间通信,对应代码:init_process_group
    • 模型广播
      • 将模型parameter、buffer广播到各节点,对应代码:model = DDP(model).to(local_rank)
    • 创建管理器reducer,给每个参数注册梯度平均hook。
  • 准备数据

    • 加载数据集,创建适用于分布式场景的数据采样器,以防不同节点使用的数据不重叠。
  • 训练阶段

    • 前向传播
      • 同步各进程状态(parameter和buffer);
      • 当DDP参数find_unused_parametertrue时,其会在forward结束时,启动一个回溯,标记未用到的参数,提前将这些设置为ready
    • 计算梯度
      • reducer外面:各进程各自开始反向计算梯度;
      • reducer外面:当某个参数的梯度计算好了,其之前注册的grad hook就会触发,在reducer里把这个参数的状态标记为ready
      • reducer里面:当某个bucket的所有参数都是ready时,reducer开始对这个bucket的所有参数开始一个异步的all-reduce梯度平均操作;
      • reducer里面:当所有bucket的梯度平均都结束后,reducer把得到的平均梯度正式写入到parameter.grad里。
    • 优化器应用梯度更新参数。

小样

import argparse
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset

import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

# 1. 基础模块 ### 
class SimpleModel(nn.Module):
    def __init__(self, input_dim):
        super(SimpleModel, self).__init__()
        self.fc = nn.Linear(input_dim, 1)
        cnt = torch.tensor(0)
        self.register_buffer('cnt', cnt)

    def forward(self, x):
        self.cnt += 1
        # print("In forward: ", self.cnt, "Rank: ", self.fc.weight.device)
        return torch.sigmoid(self.fc(x))

class SimpleDataset(Dataset):
    def __init__(self, data, target):
        self.data = data
        self.target = target

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.target[idx]
    
# 2. 初始化我们的模型、数据、各种配置  ####
## DDP:从外部得到local_rank参数。从外面得到local_rank参数,在调用DDP的时候,其会自动给出这个参数
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=-1, type=int)
FLAGS = parser.parse_args()
local_rank = FLAGS.local_rank

## DDP:DDP backend初始化
torch.cuda.set_device(local_rank)
dist.init_process_group(backend='nccl')

## 假设我们有一些数据
n_sample = 100
n_dim = 10
batch_size = 25
X = torch.randn(n_sample, n_dim)  # 100个样本,每个样本有10个特征
Y = torch.randint(0, 2, (n_sample, )).float()

dataset = SimpleDataset(X, Y)
sampler = torch.utils.data.distributed.DistributedSampler(dataset)
data_loader = DataLoader(dataset, batch_size=batch_size, sampler=sampler)

## 构造模型
model = SimpleModel(n_dim).to(local_rank)
## DDP: Load模型要在构造DDP模型之前,且只需要在master上加载就行了。
ckpt_path = None
if dist.get_rank() == 0 and ckpt_path is not None:
    model.load_state_dict(torch.load(ckpt_path))

## DDP: 构造DDP model —————— 必须在 init_process_group 之后才可以调用 DDP
model = DDP(model, device_ids=[local_rank], output_device=local_rank)

## DDP: 要在构造DDP model之后,才能用model初始化optimizer。
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
loss_func = nn.BCELoss().to(local_rank)

# 3. 网络训练  ###
model.train()
num_epoch = 100
iterator = tqdm(range(100))
for epoch in iterator:
    # DDP:设置sampler的epoch,
    # DistributedSampler需要这个来指定shuffle方式,
    # 通过维持各个进程之间的相同随机数种子使不同进程能获得同样的shuffle效果。
    data_loader.sampler.set_epoch(epoch)
    # 后面这部分,则与原来完全一致了。
    for data, label in data_loader:
        data, label = data.to(local_rank), label.to(local_rank)
        optimizer.zero_grad()
        prediction = model(data)
        loss = loss_func(prediction, label.unsqueeze(1))
        loss.backward()
        iterator.desc = "loss = %0.3f" % loss
        optimizer.step()

    # DDP:
    # 1. save模型的时候,和DP模式一样,有一个需要注意的点:保存的是model.module而不是model。
    #    因为model其实是DDP model,参数是被`model=DDP(model)`包起来的。
    # 2. 只需要在进程0上保存一次就行了,避免多次保存重复的东西。
    if dist.get_rank() == 0 and epoch == num_epoch - 1:
        torch.save(model.module.state_dict(), "%d.ckpt" % epoch)

结合上面的代码,一个简化版的DDP流程:

  1. 读取DDP相关的配置,其中最关键的就是:local_rank
  2. DDP后端初始化:dist.init_process_group
  3. 创建DDP模型,以及数据加载器。注意要为加载器创建分布式采样器(DistributedSampler);
  4. 训练。

DDP的通常启动方式:

CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nproc_per_node 2 ddp.py

一些概念

以上过程中涉及到一些陌生的概念,其实走一遍DDP的过程就会很好理解:每个进程是一个独立的训练流程,不同进程之间共享同一份数据。为了避免不同进程使用重复的数据训练,以及训练后同步梯度,进程间需要同步。因此,其中一个重点就是每个进程序号,或者说使用的GPU的序号。

  • node:节点,可以是物理主机,也可以是容器;
  • ranklocal_rank:都表示进程在整个分布式任务中的编号。rank是进程在全局的编号,local_rank是进程在所在节点上的编号。显然,如果只有一个节点,那么二者是相等的。在启动脚本中的--nproc_per_node即指定一个节点上有多少进程;
  • world_size:即整个分布式任务中进程的数量。

DDP与DP的区别

  • DP是单进程多线程的,只能在单机上工作;DDP是多进程的,可以在多级多卡上工作。DP通常比DDP慢,主要原因有:1)DP是单进程的,受到GIL的限制;2)DP每个step都需要拷贝模型,以及划分数据和收集输出;
  • DDP可以与模型并行相结合;
  • DP的通信成本随着卡数线性增长,DDP支持Ring-AllReduce,通信成本是固定的。

本文利用pytorch进行数据并行训练进行了一个粗浅的介绍。包括DP和DDP的基本原理,以及简单的例子。实际在分布式过程中涉及到的东西还是挺多的,比如DP/DDP中梯度的回收是如何进行的,DDP中数据采样的细节,DDP中的数据同步操作等。更多的还是要基于真实的需求出发才能真的体会得到。

参考


  1. 参数服务器-动手学深度学习2.0. ↩︎

  2. dataparallel ↩︎

  3. Pytorch Distributed Data Parallal. ↩︎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1365863.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

校园跑腿小程序(前后端已完成)可做项目,可当毕设,支持二创

此小程序为我单独在小程序上运行的结果,图片信息、列表信息等没有出现是因为服务器到期了,资源被释放了,无法显示。但是后端是已经实现了的,有兴趣的同学可以私聊我。 效果预览

软件系统设计开发规程

软件设计开发规程目的在于为需求设计、开发、实现解决方案。根据适当情况,解决方案、设计和实现包括单独的产品、产品组件以及产品相关的生命周期的过程,或者它们的组合,以及包括如何利用准则进行接口设计。 技术解决方案过程包括&#xff1a…

工作组,本地用户,资源共享--windows sever 2012 r2

1.建立2个用户:stu1,stu2,stu2第一次登陆必须修改密码 2.建立2个用户:zhangsan,lisi 3.删除stu1 4.修改zhangsan的密码为111111 5.修改lisi的名字为wangwu 6.修改wangwu的密码为222222 7.禁用王五的账号 8.建立2个组:class1,class…

Docker Compose--部署SpringBoot项目--实战

原文网址:Docker Compose--部署SpringBoot项目--实战-CSDN博客 简介 本文用实战介绍Docker Compose部署SpringBoot项目。 ----------------------------------------------------------------------------------------------- 分享Java真实高频面试题&#xff0c…

VMware Workstation——安装VMware Workstation Pro 17

目录 一、下载 二、安装 1、双击安装 2、安装向导 3、最终用户许可协议 4、自定义安装 5、用户体验设置 6、快捷方式 7、准备升级 8、正在安装 9、安装完成 10、输入许可证密钥 11、激活成功 12、桌面图标 一、下载 下载 VMware Workstation Pro 二、安装 1、双…

中电金信推出行业首个外汇客户风险管理系统,助力金融机构稳中提效

中电金信基于多年的行业积累洞察、和对金融机构的访谈调研发现,《办法》推出前,部分银行缺乏内控系统,只能手工判断客户是否符合便利化办理条件,并需要企业提供资料,同时缺乏交易风险审查,事后抽查存在不愿…

互联网加竞赛 基于卷积神经网络的乳腺癌分类 深度学习 医学图像

文章目录 1 前言2 前言3 数据集3.1 良性样本3.2 病变样本 4 开发环境5 代码实现5.1 实现流程5.2 部分代码实现5.2.1 导入库5.2.2 图像加载5.2.3 标记5.2.4 分组5.2.5 构建模型训练 6 分析指标6.1 精度,召回率和F1度量6.2 混淆矩阵 7 结果和结论8 最后 1 前言 &…

Docker极限压缩tar镜像,20G变10G

🎈普通打包命令: docker save -o [压缩包名字] [镜像名字] 👑极限压缩命令: docker save [镜像名字] | gzip> [压缩包名字] 先上主题,放上两条命令,请随意取用。 对于动态修改的环境,一…

UR机器人旋转矢量和俯仰角的转换

我们在使用机器人的时候,总是习惯使用俯仰角来描述机器人末端姿态的变换。这样更直观,但是机器人为了插值方便计算,总是采用旋转矢量来来描述机器人的姿态。该旋转矢量及不直观,单一轴角度旋转时还可以理解,当两个轴或…

vue3中使用elementplus中的el-tree-select,自定义显示名称label

<el-tree-select v-model"addPval" node-key"id" :data"menulists" :render-after-expand"false" :props"menuProps" /> <el-divider />let menuProps {//自定义labellabel: (data: { name: any; }) > {ret…

c语言的一些题(2024_1_7)

变种水仙花数 #include <stdio.h>int main() {int a 10000;for (; a < 100000; a){if ((a / 10000) * (a % 10000) (a / 1000) * (a % 1000) (a / 100) * (a % 100) (a / 10) * (a % 10) a)printf("%d ", a);}return 0; } //变种水仙花数 - Lily Num…

第12课 实现桌面与摄像头叠加

在上一节&#xff0c;我们实现了桌面捕获功能&#xff0c;并成功把桌面图像和麦克风声音发送给对方。在实际应用中&#xff0c;有时候会需要把桌面与摄像头图像叠加在一起发送&#xff0c;这节课我们就来看下如何实现这一功能。 1.备份与修改 备份demo11并修改demo11为demo12…

部署可道云网盘的一个漏洞解决

目录 1漏洞展示 2.防范措施 1漏洞展示 因为可道云网盘的上传文档有保存在 /data/Group/public/home/文档/ 中,当别有用心之人知道个人部署的域名与上次的文件后&#xff0c;可以进行访问拿到uid。例我在我部署的网盘上上次一个aa.php 文件&#xff0c;然后拿来演示 然后通过…

听GPT 讲Rust源代码--compiler(37)

File: rust/compiler/rustc_expand/src/errors.rs 在Rust编译器的源代码中&#xff0c;rust/compiler/rustc_expand/src/errors.rs文件的作用是定义了各种错误类型和帮助信息&#xff0c;这些错误和帮助信息用于扩展宏时的错误处理和用户提示。 下面对每个struct进行一一介绍&a…

【产品人卫朋】硬件产品经理:从入门到精通

目录 本文目录 1. 前言说明 2. 内容说明 3. 资料包说明 作者简介 本文目录 1. 前言说明 2. 内容说明 3. 资料包说明 1. 前言说明 本篇内容节选自实体书《硬件产品经理&#xff1a;从入门到精通》。 2. 内容说明 鉴于硬件产品的特殊性&#xff0c;不同产品阶段的时间间…

开源内容管理框架Drupal在Docker本地部署并实现公网远程访问

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

Java研学-Cookie与Session

一 会话跟踪 1 HTTP 无状态 HTTP 协议是无状态的&#xff0c;一问一答没有记忆&#xff0c;无法确定发出请求的用户身份。即各个请求的请求对象所包含的信息并不相同&#xff0c;一个会话中的多个请求之间无法共享数据&#xff0c;此时可以使用会话跟踪技术 2 会话跟踪技术 C…

用Java爬取新房二手房数据看总体大环境

都说现在房市惨淡&#xff0c;导致很多人在观望&#xff0c;那么今天我写一段爬虫&#xff0c;主要是抓取各地新房以及二手房成交状况&#xff0c;然后了解总体楼市是否回暖上升。 以下是Java爬虫程序的代码示例&#xff0c;用于抓取贝壳网新房和二手房数据&#xff1a; impor…

并发(11)

目录 71.ConcurrentHashMap JDK1.7说说其put的机制&#xff1f; 72.ConcurrentHashMap JDK1.7是如何扩容的&#xff1f; 73.ConcurrentHashMap JDK1.8实现的原理是什么&#xff1f; 74.ConcurrentHashMap JDK1.8是如何扩容的&#xff1f; 75.ConcurrentHashMap JDK1.8链…

Maven之私服

1 介绍 团队开发现状分析私服是一台独立的服务器&#xff0c;用于解决团队内部的资源共享与资源同步问题Nexus Sonatype公司的一款maven私服产品 下载地址&#xff1a;https://help.sonatype.com/repomanager3/download win版安装包&#xff1a;https://pan.baidu.com/s/1wk…