pytorch中分布式Collective通信API学习

news2024/9/21 4:22:34

       随着各种大模型的不断迭出,感觉现在的NLP领域已经来到了大模型的时代。那么怎么来训练一个大模型呢?其中比较关键的一个技术基础就是分布式训练。在阅读GLM源码的时候,感觉其中的分布式训练代码不是很熟悉,看起来有点吃力,为此专门对pytorch中分布训练环境的搭建和通信API进行了学习,这个对大模型训练中利用不同显卡上的梯度和数据进行训练的理解有着促进作用。

pytorch对不同的后端,有不同的支持,下面来看一张基于1.8版本的torch对后端支持的情况:

 可以看到1.8版本下,pytorch对一些操作是不支持的。由于我们训练的时候大多采用N卡,所以这里就只是对NCCL后端下的API进行学习和理解。

分布式环境初始化

既然是分布式,肯定是多进程的,每一个进程占用一张卡进行计算和推理。同时也要进行进程之间的通信连接和建立,torch自带的multiprocessing和distributed能很好的实现上述功能。初始化流程:首先开启多进程,为每一个进程指定一个后端,得到group;同时设定环境中的主机地址和端口;最后在每个进程实现不同的业务功能。详细代码如下:

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def init_dist_process(rank, world_size, test_fun, backend='nccl'):
    """
    初始化分布式环境,指定主机地址和端口号
    :param rank:
    :param world_size:
    :param test_fun:
    :param backend:
    :return:
    """
    os.environ["MASTER_ADDR"] = '127.0.0.1'
    os.environ["MASTER_PORT"] = '2222'
    # 初始化分布式进程,指定backend和rank world_size
    dist.init_process_group(backend=backend,rank=rank,world_size=world_size)
    #要执行的功能
    test_fun(rank,world_size)



def create_processes_run(world_size,test_fun):
    """
    创建进程给到每一张显卡
    :param world_size:
    :param test_fun:
    :return:
    """
    processes = []
    for rank in range(world_size):
        p = mp.Process(target = init_dist_process,args=(rank,world_size,test_fun))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()



if __name__ == '__main__':
    mp.set_start_method("spawn")
    create_processes_run(world_size=2,test_fun=...)

注意torch多进程启动方式最好采用spawn,然后create_processes_run开启多进程同时初始化分布式环境,然后执行业务功能。下面的各种API测试都是采用如上的流程进行。

1、broadcast

如下图所示,src=0上的tensor经过broadcast后,在group中的所有显卡都能接收到该tensor,因此

out[i] = In[i],每个显卡上的数据都相同。

 代码如下:

def broadcast_test(rank, world_size):
    """
    broadcast测试,把数据从src卡广播到其他所有组内的卡
    :param rank:
    :param world_size:
    :return:
    """
    group = dist.new_group(list(range(world_size)))
    data = [rank+1] * 5
    device = torch.device("cuda",rank)
    tensor = torch.tensor(data,dtype=torch.long).to(device)
    print('before  broadcast Rank:', rank, '-----data:', tensor)
    # 0卡上的数据广播到其他显卡上
    dist.broadcast(tensor=tensor, src=0, group=group)
    print('after broadcast  Rank:', rank, '-----data:', tensor)

把0卡上的数据广播上1卡上,结果如下:

before  broadcast Rank: 1 -----data: tensor([2, 2, 2, 2, 2], device='cuda:1')
before  broadcast Rank: 0 -----data: tensor([1, 1, 1, 1, 1], device='cuda:0')
after broadcast  Rank: 0 -----data: tensor([1, 1, 1, 1, 1], device='cuda:0')
after broadcast  Rank: 1 -----data: tensor([1, 1, 1, 1, 1], device='cuda:1')

可以看到执行广播后,0卡上的数据被广播到1上了,0卡和1卡的数据都是一样的了。

2、reduce

会把当前所有显卡的tensor按照指定的操作后,发送到dst显卡上。如下图所示:

 代码如下:

def reduce_test(rank,world_size):
    """
    reduce测试,归约操作,把所有卡上的数据都归约到dst卡上进行相应的dist.ReduceOp操作
    :param rank:
    :param world_size:
    :return:
    """
    group = dist.new_group(list(range(world_size)))
    data = [rank+1] * 5
    device = torch.device("cuda", rank)
    tensor = torch.tensor(data, dtype=torch.long).to(device)
    print('before  reduce Rank:', rank, '-----data:', tensor)
    #所有的rank都发送数据,dst=0卡接收, op是dist.ReduceOp.SUM
    dist.reduce(tensor=tensor,dst=0,op=dist.ReduceOp.SUM,group=group)
    print('after reduce  Rank:', rank, '-----data:', tensor)

把所有卡上的数据归约到1卡上,归约操作采用sum,结果如下:

before  reduce Rank: 1 -----data: tensor([2, 2, 2, 2, 2], device='cuda:1')
before  reduce Rank: 0 -----data: tensor([1, 1, 1, 1, 1], device='cuda:0')
after reduce  Rank: 1 -----data: tensor([2, 2, 2, 2, 2], device='cuda:1')
after reduce  Rank: 0 -----data: tensor([3, 3, 3, 3, 3], device='cuda:0')

可以看到reduce之后0卡上的数据是reduce之前0卡和1卡的数据之和

3、all_reduce

和reduce的操作一样,只不过是把reduce.op后的数据发送到group内所有的显卡上,如下图所示:

 

def allreduce_test(rank,world_size):
    """
    allreduce测试,归约操作,把所有卡上的数据都归约到所有的卡上进行dist.ReduceOp操作
    :param rank:
    :param world_size:
    :return:
    """
    group = dist.new_group(list(range(world_size)))
    data = [[rank+1] * 5]*5
    device = torch.device("cuda", rank)
    tensor = torch.tensor(data, dtype=torch.long).to(device)
    print('before  all_reduce Rank:', rank, '-----data:', tensor)
    dist.all_reduce(tensor=tensor,op=dist.ReduceOp.SUM,group=group)
    print('after all_reduce  Rank:', rank, '-----data:', tensor)

把所有卡上的数据归约到所有的卡上,结果如下:

before  all_reduce Rank: 1 -----data: tensor([[2, 2, 2, 2, 2],
        [2, 2, 2, 2, 2],
        [2, 2, 2, 2, 2],
        [2, 2, 2, 2, 2],
        [2, 2, 2, 2, 2]], device='cuda:1')
before  all_reduce Rank: 0 -----data: tensor([[1, 1, 1, 1, 1],
        [1, 1, 1, 1, 1],
        [1, 1, 1, 1, 1],
        [1, 1, 1, 1, 1],
        [1, 1, 1, 1, 1]], device='cuda:0')
after all_reduce  Rank: 0 -----data: tensor([[3, 3, 3, 3, 3],
        [3, 3, 3, 3, 3],
        [3, 3, 3, 3, 3],
        [3, 3, 3, 3, 3],
        [3, 3, 3, 3, 3]], device='cuda:0')
after all_reduce  Rank: 1 -----data: tensor([[3, 3, 3, 3, 3],
        [3, 3, 3, 3, 3],
        [3, 3, 3, 3, 3],
        [3, 3, 3, 3, 3],
        [3, 3, 3, 3, 3]], device='cuda:1')

执行all_reduce之后0卡和1卡的数据都是all_reduce之前所有卡的数据之和

4、all_gather

allgather把所有的显卡上的数据收集后,组成一个list发送到所有的显卡上,如下图所示

 0卡和1卡上的数据(不同的颜色)经过allgather后组成一个新的list,发送到所有的显卡上。

代码如下:

def allgather_test(rank,world_size):
    """
    allgather测试,聚合操作,把所有卡上的数据都聚合到一起形成一个list到所有卡上
    :param rank:
    :param world_size:
    :return:
    """
    group = dist.new_group(list(range(world_size)))
    data = [rank+1] * 5
    device = torch.device("cuda", rank)
    tensor = torch.tensor(data, dtype=torch.long).to(device)
    print('before  all_gather Rank:', rank, '-----tensor:', tensor)
    tensor_list = [ torch.zeros_like(tensor).to(tensor.device)   for _ in range(world_size)]
    print('before  all_gather Rank:', rank, '-----tensor_list:', tensor_list)
    dist.all_gather(tensor=tensor,group=group,tensor_list=tensor_list)
    dist.barrier()
    print('after all_gather  Rank:', rank, '-----tensor_list:', tensor_list)

    tensor_list =  torch.stack(tensor_list)
    print('tensor_list shape', tensor_list.shape)
    print('after all_gather  Rank:', rank, '-----tensor_list:', tensor_list)

0卡和1卡上分别是tensor([1, 1, 1, 1, 1], device='cuda:0')和tensor([2, 2, 2, 2, 2], device='cuda:1'),执行allgather后,结果如下

before  all_gather Rank: 0 -----tensor: tensor([1, 1, 1, 1, 1], device='cuda:0')
before  all_gather Rank: 0 -----tensor_list: [tensor([0, 0, 0, 0, 0], device='cuda:0'), tensor([0, 0, 0, 0, 0], device='cuda:0')]
before  all_gather Rank: 1 -----tensor: tensor([2, 2, 2, 2, 2], device='cuda:1')
before  all_gather Rank: 1 -----tensor_list: [tensor([0, 0, 0, 0, 0], device='cuda:1'), tensor([0, 0, 0, 0, 0], device='cuda:1')]
after all_gather  Rank: 0 -----tensor_list: [tensor([1, 1, 1, 1, 1], device='cuda:0'), tensor([2, 2, 2, 2, 2], device='cuda:0')]
after all_gather  Rank: 1 -----tensor_list: [tensor([1, 1, 1, 1, 1], device='cuda:1'), tensor([2, 2, 2, 2, 2], device='cuda:1')]
tensor_list shape torch.Size([2, 5])
tensor_list shape torch.Size([2, 5])
after all_gather  Rank: 0 -----tensor_list: tensor([[1, 1, 1, 1, 1],
        [2, 2, 2, 2, 2]], device='cuda:0')
after all_gather  Rank: 1 -----tensor_list: tensor([[1, 1, 1, 1, 1],
        [2, 2, 2, 2, 2]], device='cuda:1'

0卡和1卡上均能得到之前0卡和1卡的tensor,并组成一个list。

5、reduce_scatter

reduce_scatter 这是一个归并再切分的操作,如下图所示,out0对应的数据就是所有显卡上第0部分的数据经过op.SUM操作后的数据。

 代码如下:

def reduce_scatter(rank, world_size):
    """
    scatter_list 一定要是float型的
    :param rank:
    :param world_size:
    :return:
    """
    group = dist.new_group(list(range(world_size)))
    device = torch.device("cuda", rank)
    if rank == 0:
        scatter_list = [torch.tensor([1,2,3],dtype=torch.float).to(device),torch.tensor([4,5,6], dtype=torch.float).to(device)]
    else:
        scatter_list = [torch.tensor([7,8,9], dtype=torch.float).to(device),torch.tensor([10,11,12], dtype=torch.float).to(device)]

    print('before reduce_scatter  Rank:', rank, '-----scatter_list:', scatter_list)
    output = torch.zeros(1,3).to(device)
    print('before reduce_scatter  Rank:', rank, '-----output:', output)
    dist.reduce_scatter(output=output,input_list=scatter_list,group=group,op=dist.ReduceOp.SUM)
    print('after reduce_scatter  Rank:', rank, '-----output:', output)

0卡和1卡在reduce_scatter之前的数据有个list,执行reduce_scatter之后,结果如下:

before reduce_scatter  Rank: 0 -----scatter_list: [tensor([1., 2., 3.], device='cuda:0'), tensor([4., 5., 6.], device='cuda:0')]
before reduce_scatter  Rank: 1 -----scatter_list: [tensor([7., 8., 9.], device='cuda:1'), tensor([10., 11., 12.], device='cuda:1')]
before reduce_scatter  Rank: 0 -----output: tensor([[0., 0., 0.]], device='cuda:0')
before reduce_scatter  Rank: 1 -----output: tensor([[0., 0., 0.]], device='cuda:1')
after reduce_scatter  Rank: 0 -----output: tensor([[ 8., 10., 12.]], device='cuda:0')
after reduce_scatter  Rank: 1 -----output: tensor([[14., 16., 18.]], device='cuda:1')

把0卡和1卡对应index中的tensor相加起来然后发送到index号显卡上。

参考文章

【深度学习】【分布式训练】Collective通信操作及Pytorch示例

pytorch官方文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/594645.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Nature 2023】Computational approaches streamlining drug discovery

今天为大家介绍的是一篇发表于“Nature”上面的综述文章“Computational approaches streamlining drug discovery”, Computer-aided drug discovery has been around for decades, although the past few years have seen a tectonic shift towards embracing com…

马尔科夫链 Markov chain

马尔科夫链 1.实例参考文献 马尔科夫链(Markov chain)及其模型在机器学习中应用广泛,本文结合一些参考资料做一个总结。 注意的是,本文中提到的马尔科夫链,要与隐马尔科夫(HMM)作以区分。 1.实例 设某人的训练计划中有以下四项运动&#x…

通过VCP(VMware Certified Professional)认证

文章目录 小结要求考试证书参考 小结 通过VCP(VMware Certified Professional)认证,去年参加了VMware的培训,今年通过VMware Certified Professional (2V0-21.20)的考试,拿到证书。 要求 需要参加VMware的指定的培训,我在去年五…

【数据结构】 数组 array

一、什么是数组 连续内存空间,存储的一组相同类型的元素 二、常用操作 1.原理 access:通过索引直接读取元素的值 时间复杂度:O(1) search:遍历查找某个元素 时间复杂度:O(N) insert:插入位置后的元素依…

Generative AI 新世界 | 大语言模型(LLMs)在 Amazon SageMaker 上的动手实践

在上一篇《Generative AI 新世界:大型语言模型(LLMs)概述》中,我们一起探讨了大型语言模型的发展历史、语料来源、数据预处理流程策略、训练使用的网络架构、最新研究方向分析(Amazon Titan、LLaMA、PaLM-E 等&#xf…

QT超市管理系统

QT超市管理系统 前言QT介绍.pro文件主文件(main函数)窗口函数(mainwindow)用户登录(user_login)超市系统数据库(maketsql)超市商品的增删改查(dlg_addmak)收款码界面(picture)结语 前…

亚马逊云科技探路可持续,数智创未来

气候变化是全人类的共同挑战。应对气候变化,事关永续发展,关乎人类前途命运。2020年中国在联合国大会上亚马逊云科技作出庄严承诺,要在2030年实现碳达峰,2060年实现碳中和,这是个全球性的规划,也是个庄严的…

Yolov8轻量化:MobileNetV3,轻量级骨架首选

1.轻量化网络简介 轻量化网络是指在保持模型性能的前提下,尽可能减小模型参数量和计算量的神经网络。这种网络通常被用于在移动设备等资源受限的场景中部署,以提高模型的实时性和运行效率。 轻量化网络的设计思路可以包括以下几个方面: 去除冗余层和参数:通过剪枝、蒸馏等技…

MFC(五)菜单栏和工具栏

这篇文章我们来完成菜单设计和工具栏设计 菜单设计 1.点开资源视图>Menu>IDR_MAINFRAME 通过IDR_MAINFRAME我们可以编辑该资源定义,包括主菜单、其他菜单、工具栏等内容,IDR_MAINFRAME即为默认的主窗口的资源标识符 2.右键相应菜单>新插入&…

数据结构 --- 红黑树

红黑树也是一种自平衡的二叉搜索树,和AVL树比较,插入和删除时,旋转的次数更少。 红黑树的特性: 所有节点都有颜色:红色或者黑色所有null均视为黑色红色节点不能相邻根节点是黑色从根节点到任意一个叶子节点&#xff…

5分钟带你了解,SAS硬盘和SATA硬盘的区别?

一、SAS和SATA的关系 SAS的接口技术可以向下兼容SATA。具体来说,二者的兼容性主要体现在物理层和协议层的兼容。 在物理层,SAS接口和SATA接口完全兼容,SATA硬盘可以直接使用在SAS的环境中,从接口标准上而言,SATA是SAS的…

接口隔离原则:接口里的方法,你都用得到吗?

文章目录 前言接口隔离原则1、角色的合理划分2、定制服务3、接口污染 胖接口减肥总结 前言 在前面几篇文章中中,我们讲的设计原则基本上都是关于如何设计一个类。SRP 告诉我们,一个类的变化来源应该是单一的;OCP 说,不要随意修改…

Fiddler抓包工具之fiddler设置断点和简单的并发测试

断点有两种方式: 1、全局断点 2、局部断点 全局断点 全局断点的特点是:不能针对一个请求,是给所有抓到的请求打断点 全局断点如何设置: 1、快速设置断点:直接点击底部状态栏断点处 ;点击第一下是请求…

比赛记录:Educational Codeforces Round 149 (Rated for Div. 2) A~D

传送门:CF 前提提要:这场狠狠的掉分.C题刚开始少了一个特判,导致自己对自己的构造方法产生了疑问,然后就一直在做无用思考,后来交的时候排名就贼后面,然后D题的题面简直稀烂(虽然D题看懂之后极其简单…),赛时根本看不懂D题意,最终rating掉完.不亏是教育场,被狠狠的教育了 A题…

Web的基本漏洞--命令执行漏洞

目录 一、命令执行漏洞 1.命令执行漏洞的原理 2.命令执行漏洞分类 3.命令执行漏洞的危害 4.命令执行漏洞的防范措施 5.命令执行漏洞的绕过 一、命令执行漏洞 命令执行漏洞是指攻击者可以随意执行系统命令。它属于高危漏洞之一,也属于代码执行的范畴。命令执行…

内置工具横向移动

IPCSchtasks IPC: IPC$是共享"命令管道"的资源,它是为了让进程通信而开放的命名管道,连接双方可以建立安全的通道并以此通道进行加密数据交换,从而实现对远程计算机的访问。 利用条件: 1、开放139、445 2、目标开启…

Node.js V10.24.1 安装步骤(node、cnpm、yarn、vue)

一、下载node.js 下载地址:Download | Node.js 要下载历史低版本请点击“Previous Releases” Previous Releases 本文章以V10.24.1为例 ,下载64位msi 二、安装 下载完成后,一直点击Next直到安装完成,可以自己修改安装位置。…

09.二叉树

09.二叉树 1.树型结构 1.1概念 树是一种非线性的数据结构,它是由n(n>0)个有限结点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树,也就是说它是根朝上,而叶朝下的。它具有以下的特点&…

收到字节offer,我却拒绝了...

前言: 大四快毕业了,在等待读研的期间无事可做,所以打算暑期找个实习。 忠告: 本人投了字节某测试岗,看到要求是测开的要求(科班出身需要熟悉一种语言),就以为面完发技术岗的offe…

note注解

元注解 注解在注解上面的注解称为元注解。主要有以下五种。 Retention 表明注解存活时间 Documented 将注解元素放到Javadoc文档中 Target 注解可以使用到的地方 在ElementType[]中主要有以下几种类型 TYPE:类型(比如类、注解、枚举) FIELD&…