pytorch--流水线并行

news2024/9/24 18:18:25

流水线并行(pipelining )部署实施起来非常困难,因为这需要根据模型的weights把模型分块(通常涉及到对源码的修改),此外,分布式的调度和数据流的依赖也是要考虑的点;
pipelining 库可以让部署变得更加简单;
这个库包含两个部分:
splitting frontend:此部分用于把模型分块,并且捕捉到数据流之间的关系;
distributed runtime:并行地执行pipeline stage在不同的设备上,同时处理好batch的划分、调度、通信和梯度回传;
所以这个库支持以下操作:
1.对于模型的简单划分;
2.丰富的流水线调度策略,包括GPipe, 1F1B, Interleaved 1F1B and Looped BFS;
3.支持跨主机的并行;
4.支持一些常规的并行操作,比如data parallel (DDP, FSDP) or tensor parallel;
关于模型的splitting:
为了构建PipelineStage,需要提供包含了nn.Parameters and nn.Buffers的nn.Module,同时定义了能够执行对应stage的forward函数

class Transformer(nn.Module):
    def __init__(self, model_args: ModelArgs):
        super().__init__()

        self.tok_embeddings = nn.Embedding(...)

        # Using a ModuleDict lets us delete layers witout affecting names,
        # ensuring checkpoints will correctly save and load.
        self.layers = torch.nn.ModuleDict()
        for layer_id in range(model_args.n_layers):
            self.layers[str(layer_id)] = TransformerBlock(...)

        self.output = nn.Linear(...)

    def forward(self, tokens: torch.Tensor):
        # Handling layers being 'None' at runtime enables easy pipeline splitting
        h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens

        for layer in self.layers.values():
            h = layer(h, self.freqs_cis)

        h = self.norm(h) if self.norm else h
        output = self.output(h).float() if self.output else h
        return output

用这种方式定义的模型可以很容易配置stage和初始化,(为了防止OMM error使用meta device),删除对应stage不需要的层,然后构造PipelineStage 来wrap model;

with torch.device("meta"):
    assert num_stages == 2, "This is a simple 2-stage example"

    # we construct the entire model, then delete the parts we do not need for this stage
    # in practice, this can be done using a helper function that automatically divides up layers across stages.
    model = Transformer()

    if stage_index == 0:
        # prepare the first stage model
        del model.layers["1"]
        model.norm = None
        model.output = None

    elif stage_index == 1:
        # prepare the second stage model
        model.tok_embeddings = None
        del model.layers["0"]

    from torch.distributed.pipelining import PipelineStage
    stage = PipelineStage(
        model,
        stage_index,
        num_stages,
        device,
        input_args=example_input_microbatch,
    )

这里还提供自动切分模型的接口函数,这里不做细致赘述;
其中input_args 代表执行时候的input data samples,这个要拿去经过forward去确定输入输出的shapes;当同时使用其他并行trick的时候,output_args 也需要的,因为模型输出大小可能会受到影响;
第一步:构建一个执行的PipelineStage
PipelineStage用于分配通信内存,创造发送、接受操作去通信;它用来存储还未被consume的forward的缓存,同时为stage model执行backward;
PipelineStage需要知道输入输出的shape大小,方便创建通信缓存,shapes必须是固定大小的,也就是训练执行的时候它不能是变化的;
每一个stage model必须是nn.Module的格式;(所以第一步要做的事情就是手动分割模型);
当然也有其他替代方式,可以用图分割去把你的模型自动分割为一系列的nn.Module,这个要求模型必须是torch.Export traceable ;所以能手动更改模型代码是最方便的;
第二步:用PipelineSchedule 去执行
以下是执行的示例代码:

from torch.distributed.pipelining import ScheduleGPipe

# Create a schedule
schedule = ScheduleGPipe(stage, n_microbatches)

# Input data (whole batch)
x = torch.randn(batch_size, in_dim, device=device)

# Run the pipeline with input `x`
# `x` will be divided into microbatches automatically
if rank == 0:
    schedule.step(x)
else:
    output = schedule.step()

以上代码的rank应该指的是进程号,也就是在0进程中,执行stage1,在1进程中执行stage 2;

以下是官方给的关于llama流水线并行的示例代码,会更加清晰明了;

# $ torchrun --nproc-per-node 4 pippy_llama.py
import os
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from torch.distributed.pipelining import SplitPoint, pipeline, ScheduleGPipe

# Grab the model
llama = AutoModelForCausalLM.from_pretrained(
    "meta-llama/Llama-2-7b-chat-hf", low_cpu_mem_usage=True
)
print(llama)

tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-chat-hf")
tokenizer.pad_token = tokenizer.eos_token
mb_prompts = (
    "How do you", "I like to",
)  # microbatch size = 2

rank = int(os.environ["RANK"])
world_size = int(os.environ["WORLD_SIZE"])
device = torch.device(f"cuda:{rank % torch.cuda.device_count()}")
torch.distributed.init_process_group(rank=rank, world_size=world_size)

llama.to(device).eval()

# Cut model by equal number of layers per rank
layers_per_rank = llama.config.num_hidden_layers // world_size
print(f"layers_per_rank = {layers_per_rank}")
split_spec = {
    f"model.layers.{i * layers_per_rank}": SplitPoint.BEGINNING
    for i in range(1, world_size)
}

# Create a pipeline representation from the model
mb_inputs = tokenizer(mb_prompts, return_tensors="pt", padding=True).to(device)
pipe = pipeline(llama, mb_args=(mb_inputs["input_ids"],))

# Create pipeline stage for each rank
stage = pipe.build_stage(rank, device=device)

# Run time inputs
full_batch_prompts = (
    "How do you", "I like to", "Can I help", "You need to",
    "The weather is", "I found a", "What is your", "You are so",
)  # full batch size = 8
inputs = tokenizer(full_batch_prompts, return_tensors="pt", padding=True).to(device)

# Attach to a schedule
# number of microbatches = 8 // 2 = 4
num_mbs = 4
schedule = ScheduleGPipe(stage, num_mbs)

# Run
if rank == 0:
    args = inputs["input_ids"]
else:
    args = None

output = schedule.step(args)

# Decode
if output is not None:
    next_token_logits = output[0][:, -1, :]
    next_token = torch.argmax(next_token_logits, dim=-1)
    print(tokenizer.batch_decode(next_token))

torchrun --nproc-per-node 8 pippy_llama.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2161065.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QT----基于QML的计时器

赶上了实习的末班车,现在在做QML开发,第一天的学习成果,一个计时器.逻辑挺简单的,纯QML实现,代码在仓库QT-Timer 学习使用c的listmodel 学习使用了如何用c的listmodel来存储数据. 新建一个TImeListModel类继承自QAbstractListModel class TimeListModel : public QAbstrac…

AIGC基础工具-科学计算和数据处理的重要库NumPy(Numerical Python)简介

文章目录 1. NumPy 的核心概念1.1 ndarray:多维数组对象示例代码 2. NumPy 的数据类型 (dtype)示例代码 3. NumPy 的数组创建方法3.1 使用 array() 创建数组3.2 使用 zeros() 和 ones()3.3 使用 arange() 和 linspace()3.4 使用 random 模块生成随机数组 4. NumPy 数…

AOT源码解析4.3-model主体解析

1.添加参考图像(add_reference_frame) 1.1 生成位置编码和ID编码 具体操作见详情。 图1:如图所示,显示的是参考图像的位置编码和id编码的生成过程。对于id编码,将mask图像输入进conv2d卷积网络后,进行结…

容器化安装Jenkins部署devops

基础环境介绍 系统使用的是centos7.9 内核使用的是5.16.13-1.el7.elrepo.x86_64 容器使用的是26.1.4 docker-compose使用的是 v2.29.0 链路图 devops 配置git环境插件 部署好jenkins后开始配置 jenkins连接git,这里需要jenkins有连接git的插件。在已安装的插件…

【SD教程】图片也能开口说话?别惊讶!用SadTalker插件,一键生成自己的数字人,本地部署,免费使用!(附资料)

最近数字人越来越火,连互联网大佬都纷纷下场,比如360的周鸿祎,京东的刘强东等等。小伙伴可能也想拥有自己的数字人如果想用最简单的方式,那么可以用第三方的网站,例如 HeyGen平台、腾讯的智影等等。可这些网站都是收费…

HFSS中看TDR波形详细设置以及相关的解释

时域反射测量(TDR)中心思想就是用阶跃函数作为激励,应用在模型上,并检查反射随时间的变化。在检查时域之前,必须对driven solution(Modal、Terminal或Transient)执行插值扫描。然后,…

vite分目录打包以及去掉默认的.gz 文件

1.vite打包情况介绍: 1.1vite在不进行任何配置的情况下,会将除开public的所有引用到资源打包编译添加哈希值至assets文件夹中(非引用文件以及行内样式图片未被打包编译资源会被treeSharp直接忽略不打包),     1.2w…

阿里云函数计算 x NVIDIA 加速企业 AI 应用落地

作者:付宇轩 前言 阿里云函数计算(Function Compute, FC)是一种无服务器(Serverless)计算服务,允许用户在无需管理底层基础设施的情况下,直接运行代码。与传统的计算架构相比,函数…

极星Polestar EDI 项目案例

近期国内汽车行业供应商J公司收到了极星Polestar的邀请,需要通过EDI与其国内工厂传输业务数据。本案例将为大家介绍对接过程以及实施方案。 梳理需求文档 极星Polestar的EDI需求与Volvo一样,传输协议选择 OFTP,报文标准为EDIFACT&#xff0…

Swing模拟银行柜台系统

> 这是一个基于JavaSwing实现的模拟银行柜台系统。 > 具有管理员、柜员、客户三种登录角色。 > 支持开户、注册、存取款、转账、汇款、账单查询等功能。 > 本项目适合JAVA初学者作为入门学习项目。 一、部分界面演示 二、基础依赖 技术/框架版本描述Java11编…

Vue前端浏览器指纹获取:数字世界的身份密码

程序员必备宝典https://tmxkj.top/#/一个开源的JavaScript库,它通过收集用户浏览器的多种属性(如屏幕分辨率、浏览器插件、字体、Canvas和WebGL等)来生成一个独特的浏览器指纹,用于识别和追踪用户。 #Github地址 GitHub - finger…

Uniapp时间戳转时间显示/时间格式

使用uview2 time 时间格式 | uView 2.0 - 全面兼容 nvue 的 uni-app 生态框架 - uni-app UI 框架 <text class"cell-tit clamp1">{{item.create_time}} --- {{ $u.timeFormat(item.create_time, yyyy-mm-dd hh:MM:ss)}} </text>

apply、call和bind的作用和区别

apply与call 首先介绍一下apply与call&#xff0c;因为这两个方法的功能和使用方式都差不多&#xff0c;只是传参的方式不同。call和apply的作用都是改变函数运行时的上下文&#xff08;context&#xff09; 语法 fun.call(thisArg, arg1, arg2, ...)fun.apply(thisArg, arg…

类的难疑点

一、知识点 1、类的属性和对象属性&#xff08;实例属性&#xff09; shuxing"123" self.shuxing"123" 2、类的对象 self.loginMyclass() loginMyclass() 3、访问类属性和方法的操作 通过“类名.属性”访问&#xff1a;Myclass.shuxing 通…

详解常见排序

目录 ​编辑 插入排序 希尔排序&#xff08;缩小增量排序&#xff09; 选择排序 冒泡排序 堆排序 快速排序 hoare版 挖坑法 前后指针法 非递归版 归并排序 递归版 非递归版 计数排序 声明&#xff1a;以下排序代码由Java实现&#xff01;&#xff01;&#xff01…

【研赛D题成品论文】24华为杯数学建模研赛D题成品论文(第一问)+可运行代码丨免费分享

2024华为杯研究生数学建模竞赛D题精品成品论文已出&#xff01; D题 大数据驱动的地理综合问题 一、问题分析 问题一&#xff1a;目标&#xff1a;利用1990-2020年的数据&#xff0c;针对降水量和土地利用的时空演化特征进行描述。数据&#xff1a;两个核心变量&#xff0c;一…

电商效果图渲染神器:轻松高效出图

在这个电商行业飞速发展的今天&#xff0c;离不开商品图的效果。而电商效果图同样离不开渲染&#xff0c;而大量的渲染需求有需要大量的机器&#xff0c;还要追求更快的渲染速度和更稳定的性能。毕竟&#xff0c;谁不想快点完成项目又省心呢&#xff1f; 而云渲染服务是个很好…

C++之STL—deque容器

双端数组 区别于 vector (单端数组)&#xff0c; 构造函数 注意&#xff1a;读取数据时&#xff0c;const修饰保证函数内只能读取&#xff0c;不能修改数据 void print(const deque<int>& deq) {for (deque<int>::const iterator it deq.begin(); it ! deq.e…

使用 Nuxt Kit 的构建器 API 来扩展配置

title: 使用 Nuxt Kit 的构建器 API 来扩展配置 date: 2024/9/24 updated: 2024/9/24 author: cmdragon excerpt: 摘要:本文详细介绍了如何使用 Nuxt Kit 的构建器 API 来扩展和定制 Nuxt 3 项目的 webpack 和 Vite 构建配置,包括扩展Webpack和Vite配置、添加自定义插件、…

正向科技|格雷母线定位系统的设备接线安装示范

格雷母线安装规范又来了&#xff0c;这次是设备接线步骤 格雷母线是格雷母线定位系统的核心部件&#xff0c;沿着移动机车轨道方向上铺设&#xff0c;格雷母线以相互靠近的扁平状电缆与天线箱电磁偶合来进行信号传递&#xff0c;从而检测得到天线箱在格雷母线长度方向上的位置。…