【Megatron-DeepSpeed】张量并行工具代码mpu详解(一):并行环境初始化

news2024/11/27 9:54:49

相关博客
【Megatron-DeepSpeed】张量并行工具代码mpu详解(一):并行环境初始化
【深度学习】【分布式训练】DeepSpeed:AllReduce与ZeRO-DP
【深度学习】混合精度训练与显存分析
【深度学习】【分布式训练】Collective通信操作及Pytorch示例
【自然语言处理】【大模型】大语言模型BLOOM推理工具测试
【自然语言处理】【大模型】GLM-130B:一个开源双语预训练语言模型
【自然语言处理】【大模型】用于大型Transformer的8-bit矩阵乘法介绍
【自然语言处理】【大模型】BLOOM:一个176B参数且可开放获取的多语言模型

Megatron-DeepSpeed:并行环境初始化

​ Megatron-DeepSpeed是DeepSpeed版本的NVIDIA Megatron-LM。像BLOOM、GLM-130B等主流大模型都是基于Megatron-DeepSpeed开发的。这里以BLOOM版本的Megetron-DeepSpeed为例,介绍其张量并行代码mpu的细节(位于megatron/mpu下)。

​ 理解该部分的代码需要对张量并行的原理以及集合通信有一定的理解,可以看文章:

  • 【深度学习】【分布式训练】Collective通信操作及Pytorch示例
  • 【深度学习】【分布式训练】一文捋顺千亿模型训练技术:流水线并行、张量并行和3D并行
  • 【深度学习】【分布式训练】DeepSpeed:AllReduce与ZeRO-DP

阅读建议:

  1. 本文仅会解析核心代码,并会不介绍所有代码;
  2. 本文会提供一些测试脚本来展现各部分代码的功能;
  3. 建议实际动手实操来加深理解;
  4. 建议对Collective通信以及分布式模型训练有一定理解,再阅读本文;

一、总览

​ mpu目录下核心文件有:

  • initialize.py:负责数据并行组、张量并行组和流水线并行组的初始化,以及获取与各类并行组相关的信息;
  • data.py:实现张量并行中的数据广播功能;
  • cross_entropy.py:张量并行版本的交叉熵;
  • layers.py:并行版本的Embedding层,以及列并行线性层和行并行线性层;
  • mappings.py:用于张量并行的通信操作;

二、初始化原理

​ Megatron-Deepspeed框架能够支持3D并行,而3D并行中显卡如何分配成不同的组等工作就是由mpu目录下的initialize.py完成的。

在这里插入图片描述

​ 假设有两个节点Node1Node2,每个节点有8个GPU,共计16个GPU。16个GPU的编号分别为Rank0、Rank1、…、Rank15。此外,假设用户设置流水线并行度为4,张量并行度为2

  • 流水线并行

    流水线并行会将整个模型划分为4份,这里称为sub_model_1至sub_model_4。每连续的4张GPU负责一个sub_model。即上图右上角中,相同颜色的GPU负责相同的sub_model

    每个流水线并行组对应一个完整的模型,最终的流水线并行组:

    [Rank0, Rank4, Rank8, Rank12],
    
    [Rank1, Rank5, Rank9, Rank13],
    
    [Rank2, Rank6, Rank10, Rank14],
    
    [Rank3, Rank7, Rank11, Rank15],
    
  • 张量并行

    张量并行会针对流水线并行中的sub_model来进行张量的拆分。即Rank0和Rank1负责一份sub_model_1,Rank2和Rank3负责另一份sub_model_1;Rank4和Rank5负责sub_model_2,Rank6和Rank7负责另一份sub_model_2;以此类推。上图右下角中,绿色线条表示单个张量并行组,每个张量并行组都共同负责某个具体的sub_model。

    最终的张量并行组:

    [Rank0, Rank1], [Rank2, Rank3],
    
    [Rank4, Rank5], [Rank6, Rank7],
    
    [Rank8, Rank9], [Rank10, Rank11],
    
    [Rank12, Rank13], [Rank14, Rank15],
    
  • 数据并行

    数据并行的目的是要保证并行中的相同模型参数读取相同的数据。经过流水线并行和张量并行后,Rank0和Rank2负责相同的模型参数,所以Rank0和Rank2是同一个数据并行组。上图左上角中的红色线条表示数据并行组。

    [Rank0, Rank2], [Rank1, Rank3],
    
    [Rank4, Rank6], [Rank5, Rank7],
    
    [Rank8, Rank10], [Rank9, Rank11],
    
    [Rank12, Rank14], [Rank13, Rank15],
    

三、初始化代码

​ 这里不对代码的所有细节进行解析。理解上面的原理后,细节部分只要花些时间即可弄明白。这里仅对整体的代码结构或者某些有代表性的函数进行说明

​ 总的来说,初始化的目标就是要赋予下面这些变量具体的值。

# Intra-layer model parallel group that the current rank belongs to.
_TENSOR_MODEL_PARALLEL_GROUP = None
# Inter-layer model parallel group that the current rank belongs to.
_PIPELINE_MODEL_PARALLEL_GROUP = None
# Model parallel group (both intra- and pipeline) that the current rank belongs to.
_MODEL_PARALLEL_GROUP = None
# Embedding group.
_EMBEDDING_GROUP = None
# Data parallel group that the current rank belongs to.
_DATA_PARALLEL_GROUP = None

_VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK = None
_VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = None

# These values enable us to change the mpu sizes on the fly.
_MPU_TENSOR_MODEL_PARALLEL_WORLD_SIZE = None
_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = None
_MPU_TENSOR_MODEL_PARALLEL_RANK = None
_MPU_PIPELINE_MODEL_PARALLEL_RANK = None

# A list of global ranks for each pipeline group to ease calculation of the source
# rank when broadcasting from the first or last pipeline stage
_PIPELINE_GLOBAL_RANKS = None

​ 初始化功能的核心是函数 initialize_model_parallel,其主要的参数就是用户指定的张量并行数tensor_model_parallel_size_以及流水线并行数pipeline_model_parallel_size_。基于这两个用户参数来计算出各种分组。

def initialize_model_parallel(tensor_model_parallel_size_=1,
                              pipeline_model_parallel_size_=1,
                              virtual_pipeline_model_parallel_size_=None):
    ...

​ 其余的函数主要功能是:获得初始化状态、获得当前rank所在的组、获得当前进行所在特定组里的rank等。这里功能会在后续的测试代码里展示具体的应用。

​ 此外,官方代码中的函数 destroy_model_parallel中应该是存在bug,下面是可正常运行的版本:

def destroy_model_parallel():
    """Set the groups to none."""
    global _TENSOR_MODEL_PARALLEL_GROUP
    _TENSOR_MODEL_PARALLEL_GROUP = None
    global _PIPELINE_MODEL_PARALLEL_GROUP
    _PIPELINE_MODEL_PARALLEL_GROUP = None
    global _DATA_PARALLEL_GROUP
    _DATA_PARALLEL_GROUP = None
    # 以下为新增
    global _MODEL_PARALLEL_GROUP
    _MODEL_PARALLEL_GROUP = None
    global _EMBEDDING_GROUP
    _EMBEDDING_GROUP = None

四、测试代码

1. 测试设置

  • 测试使用8张GPU;
  • 张量并行度为2,流水线并行度为2;
  • 依据上面介绍的原理,流水线并行组为:[Rank0, Rank4],[Rank1, Rank5],[Rank2, Rank6], [Rank3, Rank7]
  • 依据上面介绍的原理,张量并行组为:[Rank0, Rank1],[Rank2, Rank3],[Rank4,Rank5],[Rank6,Rank7]
  • 依据上面介绍的原理,数据并行组为: [Rank0, Rank2],[Rank1, Rank3],[Rank4,Rank6],[Rank5,Rank7]

2. 辅助代码

这里的辅助代码仍然使用原始单元测试的代码,即/megatron/mpu/tests/commons.py。 下面会对这些代码进行简单的注释。

# commons.py
import sys
sys.path.append("..")

import argparse
import os
import random
import numpy
import torch

import megatron.mpu as mpu # 由于本文测试代码位于项目的根目录下,因此修改了mpu的import方式


class IdentityLayer(torch.nn.Module):
    """
    一个单层网络,会在测试cross_entropy.py时使用。
    """
    def __init__(self, size, scale=1.0):
        super(IdentityLayer, self).__init__()
        self.weight = torch.nn.Parameter(scale * torch.randn(size))

    def forward(self):
        return self.weight


def set_random_seed(seed):
    """Set random seed for reproducability."""
    random.seed(seed)
    numpy.random.seed(seed)
    torch.manual_seed(seed)
    mpu.model_parallel_cuda_manual_seed(seed)


def initialize_distributed(backend='nccl'):
    """初始化分布式环境"""
    # Get local rank in case it is provided.
    parser = argparse.ArgumentParser()
    parser.add_argument('--local_rank', type=int, default=None,
                        help='local rank passed from distributed launcher')
    args = parser.parse_args()
    local_rank = args.local_rank

    # Get rank and world size.
    rank = int(os.getenv('RANK', '0')) # 当前进程所对应的rank(rank是显卡所对应的全局编号)
    world_size = int(os.getenv("WORLD_SIZE", '1')) # world_size是指所有可用显卡的数量

    print('> initializing torch.distributed with local rank: {}, '
          'rank: {}, world size: {}'.format(local_rank, rank, world_size))

    # Set the device id.
    device = rank % torch.cuda.device_count()
    if local_rank is not None:
        device = local_rank
    torch.cuda.set_device(device)

    # 初始化分布式环境所需要的相关代码
    init_method = 'tcp://'
    master_ip = os.getenv('MASTER_ADDR', 'localhost')
    master_port = os.getenv('MASTER_PORT', '6000')
    init_method += master_ip + ':' + master_port
    torch.distributed.init_process_group(
        backend=backend, # 使用gpu时,backend最好选择nccl
        world_size=world_size,
        rank=rank,
        init_method=init_method)
        
def print_separator(message):
    """
    输出辅助函数
    """
    torch.distributed.barrier() # 保证所有进程在此处保存同步,主要是为了防止多进行输出混乱
    filler_len = (78 - len(message)) // 2
    filler = '-' * filler_len
    string = '\n' + filler + ' {} '.format(message) + filler
    if torch.distributed.get_rank() == 0:
        print(string, flush=True)
    torch.distributed.barrier()

3. 测试代码

3.1 测试代码

​ 为了展示各个函数的功能,这里并不使用原始项目中的测试代码,而是单独撰写的代码。

# test_initialize.py
import sys
sys.path.append("..")

from commons import print_separator
from commons import initialize_distributed
import megatron.mpu as mpu
import torch

def run_test(
        tensor_model_parallel_size: int,
        pipeline_model_parallel_size:int):
    print_separator(f'> Test: TP={tensor_model_parallel_size}, PP={pipeline_model_parallel_size}')
    mpu.initialize_model_parallel(
            tensor_model_parallel_size,
            pipeline_model_parallel_size) # 并行初始化
    world_size = torch.distributed.get_world_size() # world_size, 总GPU数量
    global_rank = torch.distributed.get_rank() # 当前GPU的编号
    tp_world_size = mpu.get_tensor_model_parallel_world_size() # 每个张量并行组中GPU的数量
    pp_world_size = mpu.get_pipeline_model_parallel_world_size() # 每个流水线并行组中GPU的数量
    dp_world_size = mpu.get_data_parallel_world_size() # 每个数据并行组中的GPU数量
    tp_rank = mpu.get_tensor_model_parallel_rank() # 在张量并行组中的编号
    pp_rank = mpu.get_pipeline_model_parallel_rank() # 在流水线并行组中的编号
    dp_rank = mpu.get_data_parallel_rank() # 在数据并行组中的编号
    tp_group = mpu.get_tensor_model_parallel_group()
    tp_group = torch.distributed.distributed_c10d._pg_group_ranks[tp_group] # 当前GPU所在张量并行组的映射字典
    pp_group = mpu.get_pipeline_model_parallel_group()
    pp_group = torch.distributed.distributed_c10d._pg_group_ranks[pp_group] # 当前GPU所在流水线并行组的映射字典
    dp_group = mpu.get_data_parallel_group()
    dp_group = torch.distributed.distributed_c10d._pg_group_ranks[dp_group] # 当前GPU所在数据并行组的映射字典
    torch.distributed.barrier()
    info = f"="*20 + \
            f"\n> global_rank={global_rank}\n" + \
            f"> world_size={world_size}\n" + \
            f"> tp_world_size={tp_world_size}\n" + \
            f"> pp_world_size={pp_world_size}\n" + \
            f"> dp_world_size={dp_world_size}\n" + \
            f"> tp_rank={tp_rank}\n" + \
            f"> pp_rank={pp_rank}\n" + \
            f"> dp_rank={dp_rank}\n" + \
            f"> tp_group={tp_group}\n" + \
            f"> pp_group={pp_group}\n" + \
            f"> dp_group={dp_group}\n"
    print(info, flush=True)
    torch.distributed.barrier()


if __name__ == '__main__':
    initialize_distributed() # 初始化分布式环境
    tensor_model_parallel_size = 2
    pipeline_model_parallel_size = 2
    run_test(tensor_model_parallel_size, pipeline_model_parallel_size)

3.2 启动脚本

deepspeed test_initialize.py

3.3 输出结果

在这里插入图片描述

​ 上图是Rank5的相关信息,符合前面对于其张量并行组、流水线并行组和数据并行组的分析

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/720241.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DOM“文档对象模型”

目录 DOM 1.节点层级 1.2.节点 常用节点 文档节点(document) 元素节点(Element) 属性节点(Attribute) 文本节点(Text) 其他节点 DocumentType Comment DocumentFragment 1.3.节点树…

力扣 501. 二叉搜索树中的众数

题目来源&#xff1a;https://leetcode.cn/problems/find-mode-in-binary-search-tree/description/ C题解1&#xff1a;使用中序遍历将二叉搜索树有小到大放进数组中&#xff0c;在寻找最大数组。 class Solution { public:void traversal(TreeNode* node, vector<int>…

操作系统7:进程死锁

目录 1、死锁概述 &#xff08;1&#xff09;计算机中的资源分类 1.1 - 可重用性资源和消耗性资源 1.2 - 可抢占性资源和不可抢占性资源 &#xff08;2&#xff09;计算机系统中的死锁 &#xff08;3&#xff09;死锁的定义、必要条件和处理方法 3.1 - 死锁的定义 3.2 …

运输层:TCP拥塞控制

1.运输层&#xff1a;TCP拥塞控制 笔记来源&#xff1a; 湖科大教书匠&#xff1a;TCP的拥塞控制 声明&#xff1a;该学习笔记来自湖科大教书匠&#xff0c;笔记仅做学习参考 网络资源&#xff1a;在计算机网络中的链路容量&#xff08;即带宽&#xff09;、交换结点中的缓存和…

windows 10 安装Python 3.10版

一、查找官网 1.python官网需要VPN(建立一个临时的、安全的连接&#xff0c;是一条穿过混乱的公用网络的安全、稳定的隧道。)&#xff0c;打开VPN进行代理之后&#xff0c;打开网址&#xff1a;https://www.python.org/ 二、查找所需下载的系统及版本 1.本机是winodws操作系统&…

计算机网络_ 1.3 网络核心 (数据交换_报文、分组交换)

计算机网络_数据交换_报文、分组交换 报文交换&#xff08;message switching&#xff09;分组交换&#xff08;package switching&#xff09;存储-转发&#xff08;store-and-forward&#xff09;传输延迟分组交换的报文交付时间 分组交换 vs 电路交换 报文交换&#xff08;m…

zabbix监控部署(都是导图!)

目录 一&#xff1a;监控软件的作用 二&#xff1a;zabbix 简介 1、zabbix 概述 2、zabbix 监控原理 3、 Zabbix 6.0 新特性 &#xff08;1&#xff09;Zabbix server高可用防止硬件故障或计划维护期的停机 &#xff08;2&#xff09;Zabbix 6.0 LTS新增Kubernetes监控功…

C语言入门篇(二)

前言   续接上一篇文章&#xff0c;此篇继续对&#xff23;语言的基础知识点进行更新。 C语言入门篇&#xff08;一&#xff09; 初始C语言 7. 选择语句8. 循环语句9. 函数9.1 函数的定义9.2 函数的调用 10. 数组10.1 数组定义10.2 数组的下标10.3 数组的使用 11. 操作符12.…

JAVA学习(九)

1.java锁 1.1 AtomicInteger 首先说明&#xff0c;此处 AtomicInteger&#xff0c;一个提供原子操作的 Integer 的类&#xff0c;常见的还有 AtomicBoolean、AtomicInteger、AtomicLong、AtomicReference 等&#xff0c;他们的实现原理相同&#xff0c; 区别在与运算对象类型…

搭建新版security-oauth2协议,流程代码详解,源码分析

前言&#xff1a;最近在学习搭建oauth2协议的开放平台&#xff0c;把搭建框架时的思路以及遇到的问题记录下来。 文章会持续更新&#xff0c;前期可能会比较零碎&#xff0c;最后会整合一起&#xff0c;写一篇从部署到使用、踩坑、依赖版本解决等完整文章。 使用的是Spring S…

【深度学习】深度强化学习初学者指南

一、说明 GAN&#xff08;Generative Adversarial Networks&#xff09;是一种深度学习模型&#xff0c;它由两个神经网络组成&#xff1a;一个生成网络和一个判别网络。生成网络学习如何生成类似于给定数据集的新数据&#xff0c;而判别网络则学习如何区分生成网络生成的数据和…

Linux下的基本指令

Linux下的基本指令 操作系统的概念01.adduser指令02.userdel指令03.pwd指令04.ls指令05.tree指令06.cd指令07.touch指令08.mkdir指令&#xff08;重要&#xff09;09.rmdir指针&&rm指令&#xff08;重要&#xff09;10.man指针&#xff08;重要&#xff09;11.cp指令&a…

【macOS 系列】如何调整启动台图标大小和行数

1、使用指令&#xff0c;这是隐藏的技巧&#xff0c;在控制台输入如下指令 defaults write com.apple.dock springboard-rows -int 6 defaults write com.apple.dock springboard-columns -int 8 defaults write com.apple.dock ResetLaunchPad -bool TRUE killall Dock以上表…

基于Web的智慧交通3D可视化系统

前言 城市交通是城市社会活动、经济活动的纽带和动脉&#xff0c;智慧交通系统对城市经济发展和人民生活水平起着极其重要的作用。 背景 随着我国城市化进程不断加快&#xff0c;现代城市交通问题日益受到人们的关注。特别是汽车数量的与日俱增&#xff0c;给城市带来了大量…

Socket介绍及使用Java实现socket通信

一、Socket概述 Socket&#xff08;套接字&#xff09;是计算机网络编程中用于实现网络通信的一种机制。它提供了一种编程接口&#xff0c;允许应用程序通过网络进行数据传输&#xff0c;实现不同主机之间的通信。 Socket可以看作是一种抽象的概念&#xff0c;用于描述网络通信…

性能监控软件的软件特色有哪些?

性能监控软件是一种专门用于监测和分析系统性能的软件工具。它可以帮助开发人员、运维人员和系统管理员追踪系统的性能瓶颈&#xff0c;优化系统性能&#xff0c;提供稳定可靠的服务&#xff0c;那性能监控软件的软件特色有哪些&#xff1f; 实时监控&#xff1a;性能监控软件可…

3、CCesium 第一个地图

1、将下载的CCsium库解压到指定目录如D:/install&#xff0c;里面有几个文件 bin&#xff1a;导出的dll examples&#xff1a;一个简单的示例 include&#xff1a;ccesium和第三方库的头文件 lib&#xff1a;搜索cceisum库的一些信息 share&#xff1a;一些说明 我们主要用…

allatori-JAVA代码混淆工具

Allatori是第二代Java混淆器&#xff0c;它为您的知识产权提供全方位的保护。 虽然大多数第二代混淆器都能提供值得信赖的保护&#xff0c;但我们在Allatori中开发了一些额外的功能&#xff0c;使代码的反向工程几乎不可能。 Allatori不仅仅是混淆&#xff0c;它还能最大限度…

为什么你感觉中层管理一直闲着没事干?

点击下方“JavaEdge”&#xff0c;选择“设为星标” 第一时间关注技术干货&#xff01; 免责声明~ 切记&#xff0c;任何文章不要过度深思&#xff08;任何东西都无法经得起审视&#xff0c;因为这世上没有同样的成长环境&#xff0c;也没有同样的认知水平同时也「没有适用于所…

eNSP的使用

目录 配置路由器 DHCP——动态主机配置协议 工作过程&#xff1a; 第一种场景——PC首次获取IP地址的情况 第二种场景——pc&#xff08;客户都&#xff09;再次获取IP地址的情况 DHCP配置&#xff1a; Telnet 搭建好图后 1.看广播域&#xff08;路由器有几个接口就有几…