使用torch完成多卡训练

news2025/1/4 16:52:31

最近在学detr,顺便学了一下多卡训模型,它的源码写的非常不错!

我自己在detr的代码的基础上实现了一个vae的训练,在mnist数据集上,4张2080上,batch size开到1024,训练快到飞起。

总结一下多卡训练的流程,以及自己遇到的一些坑,一方面方便自己之后回顾,另一方面能帮到大家最好。

可以分几个文件放置不同功能,首先是main.py。

第一步是初始化,初始化各个gpu,每个gpu在用的时候会有编号,这样代码才知道我在那个gpu上跑的。

其中涉及到一些概念,比如说world_size,node,rank,local_rank等。

简单解释一下world_size就是进程总数,因为每个进程对应着一个gpu,所以world_size就是gpu的总数,像我现在的配置,一个机器四张卡,那我的world_size就是4。

node就是计算节点的数目,像我只有一台机子,node那就是1。

rank和local_rank是gpu的编号,对于只有一个机子的情况,两个是一样的,第一个gpu的rank和local_rank就是0,第二个gpu的rank和local_rank就是1...

更加具体的内容可以看这里local_rank,rank,node等理解_rank world size-CSDN博客

代码的化就是这一行

init_distributed_mode(config)

config是啥先不用管,它就是一个字典,里面是使用argparser产生的参数字典。这个函数的作用就是给这个进程指定它的rank,local_rank等信息,我理解是你用

python -m torch.distributed.launch --nproc_per_node=8 --use_env main.py

启动多卡时,这个main.py会在每个卡上都执行,每个卡分配一个进程,也就是我如果有四张gpu的话,那这个main.py就会执行四遍,我也不知道理解的对不对,欢迎佬们批评指正。

下面几个是关于并行的辅助函数,放在了utils.py中

`init_distributed_mode`长下面这个样子,按道理说这个函数抄了用就行,我还是稍微解释一下。

def init_distributed_mode(config):
    if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:
        config.rank = int(os.environ["RANK"])
        config.world_size = int(os.environ['WORLD_SIZE'])
        config.gpu = int(os.environ['LOCAL_RANK'])
    elif 'SLURM_PROCID' in os.environ:
        config.rank = int(os.environ['SLURM_PROCID'])
        config.gpu = config.rank % torch.cuda.device_count()
    else:
        print('Not using distributed mode')
        config.distributed = False
        return

    config.distributed = True

    torch.cuda.set_device(config.gpu)
    config.dist_backend = 'nccl'
    print('| distributed init (rank {}): {}'.format(
        config.rank, config.dist_url), flush=True)
    torch.distributed.init_process_group(backend=config.dist_backend, init_method=config.dist_url,
                                         world_size=config.world_size, rank=config.rank)
    torch.distributed.barrier()
    setup_for_distributed(config.rank == 0)

当用上面的那个多卡启动命令的时候,环境变量里面就会自己产生RANK,WORLD_SIZE,LOCAL_RANK等,没启动的时候你去查会发现没有,我认为是pytorch自己内部做了啥动作。这个时候每个gpu的RANK,LOCAL_RANK会不一样,那么每个进程就可以知道自己是哪个gpu了。把这些RANK,WORLD_SIZE,LOCAL_RANK加到config这字典里,后面就可以用了。

下面那些命令我觉得抄上就行了。

其中值的说明一下的是

torch.cuda.set_device(config.gpu)

这个命令应该就是设置你的gpu是config.gpu这个,等你用`model.cuda()`或者`model.to('cuda')`的时候,不会默认给你搬到第一张gpu上,而是搬到config.gpu这张上去。参考:torch.device和torch.cuda.set_device()_torch.device('cuda:0')-CSDN博客

torch.distributed.barrier()

这个说的是,如果你第一张gpu准备好了,而其他的gpu还没准备好,那就等在这,等其他都完成,,起一个同步的作用。

setup_for_distributed(config.rank == 0)

这个是关掉其他卡上的打印操作,旨在rank=0,也就是第一张gpu的进程上显示打印信息,不然你控制台就太难看了,好多重复信息。这个函数也是你自己写的,抄上就行,具体我也看不懂。

这个函数长这样

def setup_for_distributed(is_master):
    """
    This function disables printing when not in master process
    """
    import builtins as __builtin__
    builtin_print = __builtin__.print

    def print(*args, **kwargs):
        force = kwargs.pop('force', False)
        if is_master or force:
            builtin_print(*args, **kwargs)

    __builtin__.print = print

ok初始化到这就完成了。

接着回到main.py中。

然后是设置随机种子,这个是必须设的,要保证可复现和不同gpu上一样的随机,否则模型就不一样了,你按照不同的seed初始化模型参数,不同gpu上模型初始化参数不一样,这样是不行的。

    seed = config.seed + get_rank()
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)

这个地方我有点迷,detr里面的seed为啥是我设置的seed加上我当前进程的rank?那我不同gpu上的seed不就不一样了?比如config.seed=42,那gpu0-3的seed分别是42,43,44,45,这样我训练的时候能保证模型一模一样吗?不是说DistributedDataParallel不同gpu上模型是一模一样的吗,只是数据分布到不同gpu上?我不太懂,希望有明白的佬提点一下。

这里的`get_rank`也非常简单,

def is_dist_avail_and_initialized():
    if not dist.is_available():
        return False
    if not dist.is_initialized():
        return False
    return True    

def get_rank():
    if not is_dist_avail_and_initialized():
        return 0
    return dist.get_rank()

`dist`是`import torch.distributed as dist` 。

接着就是定义模型,然后把模型搬到device上去,这里的config.deice='cuda'。

    model = VAE(config.in_channels, config.latent_dim, config.hidden_dim)
    model.to(device)

接着就是把模型变成DDP的模型,我认为封装后的model里面会有一些同步参数的行为,但是我们用户不太需要了解。

    model_without_ddp = model
    if config.distributed:
        model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[config.gpu])
        model_without_ddp = model.module

这里的`model.module`是把封装好的类里面的模型取出来,因为封装后的DDP模型其实变了好多,model.parameters等方法都没了,没法直接操作模型,而`model.moudle`的作用就是把原来的model取出来,这个model_without_ddp就可以方便我们导入预训练参数,冻结参数等等操作了。

然后就是定义优化器和学习率策略,

    optimizer = optim.Adam(model_without_ddp.parameters(), lr=config.LR, weight_decay=config.weight_decay)
    
    lr_scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=config.scheduler_gamma)

接着是数据的导入,和data_loader的构建,这地方是和原始单gpu有差异的地方之一。

    transform=transforms.Compose([
        transforms.ToTensor()
    ])
    train_dataset = MNIST(config.data_path, train=True, download=True, transform=transform)
    val_dataset = MNIST(config.data_path, train=False, download=True, transform=transform)

    if config.distributed:
        train_sampler = DistributedSampler(train_dataset)
        val_sampler = DistributedSampler(val_dataset, shuffle=False)
    else:
        train_sampler = RandomSampler(train_dataset)
        val_sampler = SequentialSampler(val_dataset)
    
    batch_train_sampler = BatchSampler(train_sampler, config.batch_size, drop_last=True)
    batch_val_sampler = BatchSampler(val_sampler, config.batch_size, drop_last=False)

    train_loader = DataLoader(train_dataset, 
                              batch_sampler=batch_train_sampler, 
                              num_workers=config.num_workers,
                              collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset,
                            batch_sampler=batch_val_sampler,
                            drop_last=False,
                            num_workers=config.num_workers,
                            collate_fn=collate_fn)

第一个是torch.utils.data.DistributedSampler,这函数的就是给不同的gpu分配不通的数据,简单来说就是把编号1,3,5,...的数据分给gpu0,2,4,8,...分给gpu2等等,这一步只是把索引给分好。

第二个BatchSampler就是在把数据进行batch化。

Sampler本质就是个数据分配的东西,指定哪个数据到哪个batch,哪个设备上取训练。

现在就可以开始训练了,第一个有一步是

for epoch in tqdm(range(config.epochs)):
    if config.distributed:
        train_sampler.set_epoch(epoch)

`set_epoch`是必须的用于启动DataLoader的shuffle功能,否则数据集还是顺序的,sampler没用,每个epoch的数据顺序一样,参考https://blog.csdn.net/YoJayC/article/details/121532525。官方的教程里是这么说的,

后面就没啥了,训练更新参数和学习率

train_stats = train_one_epoch(model, train_loader, optimizer, **config)
lr_scheduler.step()

`train_one_epoch`和单卡写法一样,但是里面有个地方,也是非常重要的一个点,就是怎么同步结果,比如loss怎么处理。因为我这个gpu进程得到的是这个进程上数据的loss,比如我有四张卡,第一张卡上batch size有64个数据,会产生一个loss,但是四张卡按理来说是256的batch size,我们的做法应该是把四张卡的loss加起来除以四。当然loss的backward还是跟原来一样,还是在64的batch size上去做哈,只是我们记录的时候呢要把四张卡的loss加起来,在rank=0的卡上记录。

具体做法依赖于一个函数就是`dist.all_reduce`,用于同步的函数。以下面这个函数为例

def get_world_size():
    if not is_dist_avail_and_initialized():
        return 1
    return dist.get_world_size()

def reduce_dict(input_dict, average=True):
    """
    Args:
        input_dict (dict): all the values will be reduced
        average (bool): whether to do average or sum
    Reduce the values in the dictionary from all processes so that all processes
    have the averaged results. Returns a dict with the same fields as
    input_dict, after reduction.
    """
    world_size = get_world_size()
    if world_size < 2:
        return input_dict
    with torch.no_grad():
        names = []
        values = []
        # sort the keys so that they are consistent across processes
        for k in sorted(input_dict.keys()):
            names.append(k)
            values.append(input_dict[k])
        values = torch.stack(values, dim=0)
        dist.all_reduce(values)
        if average:
            values /= world_size
        reduced_dict = {k: v for k, v in zip(names, values)}
    return reduced_dict

我的这个`input_dict`其实是一个字典,里面每个值都是每个进程产生的结果,每个进程产生的结果都不一样,比如第一张卡是{"loss1":0,2, "loss2": 0.11, "loss3": 0.45},第二张卡是{"loss1":0,23, "loss2": 0.01, "loss3": 0.42}等等,做法是首先在每张卡上把loss1,loss2, loss3的内容stack起来成一个张量,比如第一张卡成为tensor([0.2, 0.11, 0.45]),第二张卡为tensor([0.23, 0.01, 0.42])等等,那么dist.all_reduce(values)就会把所有结果加起来,再除以我们的gpu数量,那就是256的batch size的loss了。

`train_one_epoch`代码如下。

import torch
from utils import *
import os
import sys
import math

def train_one_epoch(model:torch.nn.Module,
                    data_loader: torch.utils.data.DataLoader,
                    optimizer: torch.optim.Optimizer,
                    **kwargs):
    model.train()
    loss_lst = []
    recons_loss_lst = []
    kld_loss_lst = []
    for sample_batch in data_loader:
        sample_batch.to(kwargs["device"])

        results = model(sample_batch)
        train_stats = model.module.loss_function(results, **kwargs)
        loss = train_stats['loss']

        # reduce losses over all GPUs for logging purposes
        train_stats_reduced = reduce_dict(train_stats)
        loss_reduced = train_stats_reduced["loss"].item()
        loss_lst.append(loss_reduced)
        recons_loss_lst.append(train_stats_reduced["Reconstruction_loss"].item())
        kld_loss_lst.append(train_stats_reduced["KLD"].item())

        if not math.isfinite(loss_reduced):
            print("Loss is {}, stopping training".format(loss_reduced))
            print(train_stats_reduced)
            sys.exit(1)    

        optimizer.zero_grad()
        loss.backward()
        if kwargs['clip_max_norm'] > 0:
            torch.nn.utils.clip_grad_norm_(model.parameters(), kwargs['clip_max_norm'])
        optimizer.step()

    return {'loss': sum(loss_lst)/len(loss_lst), "Reconstruction_loss": sum(recons_loss_lst)/len(recons_loss_lst), "KLD": sum(kld_loss_lst)/len(kld_loss_lst)}



注意一个地方就是`sample_batch.to(kwargs["device"])`,kwargs["device"]是我们config里面的device设置为‘cuda’没有指定那一张卡,它会默认搬到第一步初始化的`torch.cuda.set_device(config.gpu)`上去。

最后是保存模型,要在第一张卡上去保存,用函数`is_main_process`判断一下,保存就行了。

def is_main_process():
    return get_rank() == 0

def save_on_master(*args, **kwargs):
    if is_main_process():
        torch.save(*args, **kwargs)
save_on_master(
       {
              "model": model_without_ddp.state_dict(),
              "optimizer": optimizer.state_dict(),
              "lr_scheduler": lr_scheduler.state_dict(),
              "epoch": epoch,
              "config": config,
       },
       checkpoint_path)

最后在终端里面输入

python -m torch.distributed.launch --nproc_per_node=4 --use_env main.py --xxxparameters

其中`nproc_per_node`的意思是每张机子上有几张卡,--use_env是第一步中初始化的时候将RANK,LOCAL_RANK等添加到os.environ中的意思。

代码在https://github.com/JHW5981/MISC/tree/main/vae可见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1661877.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JUC下的CompletableFuture详解

详细介绍 CompletableFuture是Java 8引入的一个实现Future接口的类&#xff0c;它代表一个异步计算的结果。与传统的Future相比&#xff0c;CompletableFuture提供了更丰富的功能&#xff0c;比如链式调用、组合异步操作、转换结果、异常处理等&#xff0c;极大地增强了Java在…

力扣HOT100 - 739. 每日温度

解题思路&#xff1a; 单调栈 class Solution {public int[] dailyTemperatures(int[] temperatures) {int length temperatures.length;int[] ans new int[length];Deque<Integer> stack new LinkedList<>();for (int i 0; i < length; i) {int temperatu…

TCP超时重传机制

一、TCP超时重传机制简介 TCP超时重传机制是指当发送端发送数据后&#xff0c;如果在一定时间内未收到接收端的确认应答&#xff0c;则会认为数据丢失或损坏&#xff0c;从而触发重传机制。发送端会重新发送数据&#xff0c;并等待确认应答。如果在多次重传后仍未收到确认应答&…

VMware Workstation 17 Player 创建虚拟机教程

本教程是以windows server 2012物理机服务器安装好的VMware Workstation 17 Player为例进行演示&#xff0c;安装VMware Workstation 17 Player大家可以自行网上搜索安装。 1、新建虚拟机 双击安装好的VMvare图标&#xff0c;点击创建虚拟机。 2、选择是否安装系统 本步骤选…

复习了好久的软考中项,现在上半年不考了,该怎么办?

如果有更多学习时间的话&#xff0c;可以考虑报考高级职称&#xff0c;因为高级和中级职称的很多知识点有重叠&#xff0c;只需要再复习一下相关论文就可以了。 从2024年下半年开始&#xff0c;集成考试将采用最新版教材和大纲&#xff0c;与高级职称的新版教材内容相似度很高…

Spring框架学习笔记(二):Spring IOC容器配置 Bean,分别基于XML配置bean 和 基于注解配置 bean

1 Spring 配置/管理 bean 介绍 Bean 管理包括两方面 &#xff1a;创建 bean 对象&#xff1b;给 bean 注入属性 Bean 配置方式&#xff1a;基于 xml 文件配置方式&#xff1b;基于注解方式 2 基于 XML 配置 bean 2.1 通过类型来获取 bean 方法&#xff1a;给getBean传入一…

新型AI Stable Artisan横空出世?

StabilityAI宣布推出Stable Artisan 前言 就在今天&#xff0c;Stability AI宣布推出 Stable Artisan&#xff0c;让更广泛的受众能够使用 Stability AI 的 Developer Platform API 功能。Stable Artisan 具有他们的高级型号&#xff0c;例如 Stable Diffusion 3、Stable Video…

4000字超详解Linux权限

各位大佬好 &#xff0c;这里是阿川的博客 &#xff0c; 祝您变得更强 个人主页&#xff1a;在线OJ的阿川 大佬的支持和鼓励&#xff0c;将是我成长路上最大的动力 阿川水平有限&#xff0c;如有错误&#xff0c;欢迎大佬指正 在Linux当中权限的体现主要有两种 普通用户 超…

ARIMA模型在河流水质预测中的应用_含代码

#水质模型 #时间序列 #python应用 ARIMA 时间序列模型简介 时间序列是研究数据随时间变化而变化的一种算法&#xff0c;是一种预测性分析算法。它的基本出发点就是事物发展都有连续性&#xff0c;按照它本身固有的规律进行。ARIMA(p,d,q)模型全称为差分自回归移动平均模型 (A…

动态IP避坑指南:如何挑选合适的动态代理IP?

在如今的网络环境中&#xff0c;使用动态IP代理成为实现隐私保护、访问受限内容和提高网络效率的一种常见方式&#xff0c;选择合适的国外动态IP代理可以让我们的业务处理事半功倍。面对市面上琳琅满目的选择&#xff0c;如何挑选购买适合自己的动态IP代理服务呢&#xff1f;在…

数字化转型失败率80%!盘点国内数字化转型“失败案例”有哪些

尤记得几年前&#xff0c;那桩轰动一时的《国外某巨额投入的数字化转型项目失败所引起的法律纠纷案》。 当时&#xff0c;业界人士几乎都在热议这件事。 我也在了解整件事情的原委后&#xff0c;发表一些感想。 当时我就觉得&#xff0c;作为行业从业人员&#xff0c;不要幸…

动态表名 的使用方法

动态表名插件的底层是 拦截器 1&#xff0c;创建一个拦截器 Configuration public class MybatisConfiguration {Beanpublic DynamicTableNameInnerInterceptor dynamicTableNameInnerInterceptor() {// 准备一个Map&#xff0c;用于存储TableNameHandlerMap<String, Table…

3d gaussian-splatting源码运行及结果展示

笔者是在windows下配置的环境 源码地址及官方教程 github gaussian-splatting 官网给出了详细的配置教程和视频解说 记录一下个人的部署过程 环境需求 硬件需求 具有计算能力 7.0 的带有CUDA的GPU 24G显存 软件需求 python版本我没注意到明确说明&#xff0c;3.7以上应…

用世界语言讲好中国故事 英孚青少儿“中华文化少年说”广州佛山展演开启

秉持“用世界语言&#xff0c;讲好中国故事”的初心&#xff0c;着眼于培养中国青少儿文化素养&#xff0c;提升青少儿文化自信&#xff0c;英孚教育青少儿近日在广州海珠乐峰广场举办了“中华文化少年说”10周年国宝季广佛展演。学员们在舞台上自信表达&#xff0c;用丰富的动…

机器学习算法应用——时间序列分析(4-5)

时间序列分析&#xff08;4-5&#xff09; 时间序列分析&#xff08;Time-Series Analysis&#xff09;是一种对按时间顺序排列的数据序列进行统计分析和预测的方法。这种方法通常用于研究某个现象随时间的变化规律&#xff0c;并据此预测未来的发展趋势。以下是时间序列分析的…

EasyExcel处理Mysql百万数据的导入导出案例,秒级效率,拿来即用!

一、写在开头 今天终于更新新专栏 《EfficientFarm》 的第二篇博文啦&#xff0c;本文主要来记录一下对于EasyExcel的高效应用&#xff0c;包括对MySQL数据库百万级数据量的导入与导出操作&#xff0c;以及性能的优化&#xff08;争取做到秒级性能&#xff01;&#xff09;。 …

【甲辰雜俎】世界上最不可靠的就是人

"世界上最不可靠的就是人" 人是一個多元的複變函數, 今天經受住考驗, 明天你就有可能叛變。 過去是戰場上的仇敵, 明天就有可能成為政治上的盟友。 —— 擷取自電視劇《黑冰》 人的不可預測性, 的確是一個普遍的現象。 每個人都是一個獨特的個體, 受到不同的…

Linux添加IP地址的方法

1.nmcli&#xff1a;命令式的添加IP地址 [rootlocalhost ~]#nmcli connection modify eno16777736 ipv4.addresses 192.168.126.100/24 ipv4.gateway 192.168.126.1 ipv4.method manual connection.autoconnect yes [rootlocalhost ~]# nmcli connection modify eno16777736 i…

第十三届蓝桥杯决赛(国赛)真题 Java C 组【原卷】

文章目录 发现宝藏试题 A: 斐波那契与 7试题 B: 小蓝做实验试题 C: 取模试题 D: 内存空间试题 E \mathrm{E} E : 斐波那契数组试题 F: 最大公约数试题 G: 交通信号试题 I: 打折试题 J: 宝石收集 发现宝藏 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#x…

WPF之绑定属性值转换

1&#xff0c;使用Binding.Format属性简易设置绑定的属性数据显示格式。 <TextBox Grid.Row"2" Grid.Column"1"><TextBox.Text><Binding Path"UnitCost" StringFormat"{}{0:C3}" > …