深度学习:基于MindSpore NLP的数据并行训练

news2025/1/3 2:10:30

什么是数据并行?

数据并行(Data Parallelism, DP)的核心思想是将大规模的数据集分割成若干个较小的数据子集,并将这些子集分配到不同的 NPU 计算节点上,每个节点运行相同的模型副本,但处理不同的数据子集。

1. 将数据区分为不同的mini-batch

将数据集切分为若干子集,每个mini-batch又不同的设备独立处理。如你有4个GPU,可以把数据集分为4分,每个GPU处理一个字数据集。

2. 模型参数同步

可以通过某一进程初始化全部模型参数后,向其他进程广播模型参数,实现同步。

3. 前向运算

每个设备独立计算前向运算。

4. 反向运算

每个设备计算损失的梯度。

5. 梯度聚合

当所有设备计算完各自的梯度后,对所有设备的梯度取平均,每个设备的模型参数根据平均梯度进行更新。

6. 参数更新

数据并行的参数更新是在数据切分、模型参数同步后进行的。更新前,每个进程的参数相同;更新时,平均梯度相同;故更新后,每个进程的参数也相同。

数据并行的基本操作

Reduce 归约

归约是函数式编程的概念。数据归约包括通过函数将一组数字归约为较小的一组数字。

如sum([1, 2, 3, 4, 5])=15, multiply([1, 2, 3, 4, 5])=120。

AllReduce

等效于执行Reduce操作后,将结果广播分配给所有进程。

MindSpore AllReduce

import numpy as np
from mindspore.communication import init
from mindspore.communication.comm_func import all_reduce
from mindspore import Tensor

init()
input_tensor = Tensor(np.ones([2, 8]).astype(np.float32))
output = all_reduce(input_tensor)

数据并行的主要计算思想

Parameter-Server

主要思想:所有node被分为server node和worker node

Server node:负责参数的存储和全局的聚合操作

Worker node:负责计算 

Parameter-Server的问题:

  • 假设有N=5张卡,GPU0作为Server,其余作为Worker
  • 将大小为K的数据拆分为N-1份,分给每个Worker GPU
  • 每个GPU计算得到local gradients
  • N-1块GPU将计算所得的local gradients发送给GPU 0
  • GPU 0对所有local gradients进行all reduce操作得到全局梯度,参数更新
  • 将该新模型的参数返回给每张GPU

假设单个Worker到Server的通信开销为C,那么将local gradients送到GPU 0上的通信成本为C * (N - 1)。收到GPU 0通信带宽的影响,通信成本随着设备数的增加而线性增长。

Pytorch DataParallel 

Pytorch DP在Parameter-Server的基础上,把GPU 0即当作Server也当作Worker。

1. 切分数据,但不切分Label

每个GPU进行正向计算之后,将正向计算结果聚合回GPU 0计算Loss,GPU 0计算完Loss的gradient之后,将梯度分发回其他worker GPU。随后各个GPU计算整个模型的grad,再将grad聚合回GPU 0,进行AllReduce。

2. 切分数据,同时切分Label 

每张卡自己计算Loss即可,减少一次聚合操作。

Pytorch DataParallel 问题:

1. 为摆脱Parameter-Server模式,性能差。

2. 需要额外的GPU进行梯度聚合/ GPU 0需要额外的显存。GPU 0限制了其他GPU的上限。

Ring AllReduce

每张卡单向通讯,通讯开销一定。

每张卡占用的显存相同。

第一步:Scatter-Reduce 

假设每张卡上各自计算好了梯度。

每张GPU依次传值: 

重复直至: 

第二步:All-Gather

将每一个累计值a / b / c逐个发送至个张卡

直至每张卡都有每层的梯度累计值。

两步分别做了四次通讯,便可以实现并行计算。

Ring AllReduce计算开销

  • N-1次Scatter-Reduce
  • N-1次All-Gather
  • 每个GPUGPU一次通讯量为:K/N,K为总数据大小
  • 每个GPU通信次数为:2(N-1)

总通信量为:2(N-1)*(K/N)

当N足够多时,通信量为一个常数2K。

Gradient Bucketing

集合通信在大张量上更有效。因此,可以在短时间内等待并将多个梯度存储到一个数据桶(Bucket),然后进行AllReduce操作。而不是对每个梯度立刻启动AllReduce操作。

MindSpore数据并行 

def forward_fn(data, target):
    logits = net(data)
    loss = loss_fn(logits, target)
    return loss, logits

grad_fn = ms.value_and_grad(forward_fn, None, net_trainable_param(), has_aux=True)
# 初始化reducer
grad_gather = nn.DistributedGradReducer(optimizer.parameters)

for epoch in range(10):
    i = 0
    for image, label in data_set:
        (loss_value, _), grads = grad_fn(image, label)
        # 进行通讯
        grads = grad_reducer(grads)
        optimizer(grads)
        # ...

MindNLP数据并行

def update_gradient_by_distributed_type(self, model: nn.Module) -> None:
    '''update gradient by distributed_type'''
    if accelerate_distributed_type == DistributedType.NO:
        return
    if accelerate_distributed_type == DistrivutedType.MULTI_NPU:
        from mindspore.communication import get_group_size
        from mindspore.communication.comm_func iport all_reduce
        rank_size = get_group_size()
        for parameter in model.parameters():
            # 进行all_reduce
            new_grads_mean = all_reduce(parameter.grad) / rank_size
            parameter.grad = new_grads_mean

数据并行的局限性

要求单卡可以放下模型

多卡训练时内存冗余,相同模型参数复制了多份。

MindSopre中的数据并行

1. 在启智社区创建云脑任务或华为云创建notebook

环境选择:mindspore==2.3.0, cann==8.0,昇腾910 * 2

 2. 更新MindSpore框架版本

pip install --upgrade mindspore

同时可以查看NPU信息:

npu--smi info

3. 配置项目环境

克隆mindnlp项目

git clone https://github.com/mindspore-lab/mindnlp.git

下载mindnlp

cd mindnlp
bash scripts/build_and_reinstall.sh

下载完成后,卸载mindformers、soundfile

pip uninstall mindformers

4. 运行训练脚本

cd mindnlp/llm/parallel/bert_imdb_finetune_dp
msrun --worker_num=2 --local_worker_num=2 --master_port=8118 bert_imdb_finetune_cpu_mindnlp_trainer_npus_same.py 

发现两个NPU都被占用 

日志文件开始记录模型训练进度 

成功实现数据并行! 

基于MindSpore微调Roberta+数据并行

数据集:imdb影评数据集

微调代码:roberta.py

#!/usr/bin/env python
# coding: utf-8
"""
unset MULTI_NPU && python bert_imdb_finetune_cpu_mindnlp_trainer_npus_same.py
bash bert_imdb_finetune_npu_mindnlp_trainer.sh
"""
import mindspore.dataset as ds
from mindnlp.dataset import load_dataset

# loading dataset
imdb_ds = load_dataset('imdb', split=['train', 'test'])
imdb_train = imdb_ds['train']
imdb_test = imdb_ds['test']
 
imdb_train.get_dataset_size()
 
import numpy as np
 
def process_dataset(dataset, tokenizer, max_seq_len=512, batch_size=4, shuffle=False):
    is_ascend = mindspore.get_context('device_target') == 'Ascend'
    def tokenize(text):
        if is_ascend:
            tokenized = tokenizer(text, padding='max_length', truncation=True, max_length=max_seq_len)
        else:
            tokenized = tokenizer(text, truncation=True, max_length=max_seq_len)
        return tokenized['input_ids'], tokenized['attention_mask']
 
    if shuffle:
        dataset = dataset.shuffle(batch_size)
 
    # map dataset
    dataset = dataset.map(operations=[tokenize], input_columns="text", output_columns=['input_ids', 'attention_mask'])
    dataset = dataset.map(operations=transforms.TypeCast(mindspore.int32), input_columns="label", output_columns="labels")
    # batch dataset
    if is_ascend:
        dataset = dataset.batch(batch_size)
    else:
        dataset = dataset.padded_batch(batch_size, pad_info={'input_ids': (None, tokenizer.pad_token_id),
                                                             'attention_mask': (None, 0)})
 
    return dataset

from mindnlp.transformers import AutoTokenizer
import mindspore
import mindspore.dataset.transforms as transforms
# tokenizer
tokenizer = AutoTokenizer.from_pretrained('roberta-base')

dataset_train = process_dataset(yelp_ds_train, tokenizer, shuffle=True)
from mindnlp.transformers import AutoModelForSequenceClassification

# set bert config and define parameters for training
model = AutoModelForSequenceClassification.from_pretrained('AI-ModelScope/roberta-base', num_labels=2, mirror='modelscope')

from mindnlp.engine import TrainingArguments

training_args = TrainingArguments(
    output_dir="./",
    save_strategy="epoch",
    logging_strategy="epoch",
    num_train_epochs=3,
    learning_rate=2e-5
)

training_args = training_args.set_optimizer(name="adamw", beta1=0.8)

from mindnlp.engine import Trainer

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset_train
)

print('start training')
trainer.train()

运行命令:

msrun --worker_num=2 --local_worker_num=2 --master_port=8118 roberta.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2268322.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Docker Run使用方法及参数详细说明

Docker Run使用方法及参数详细说明 基本语法常用参数使用示例总结Docker Run是Docker中最基本的命令之一,用于创建并启动一个新的容器。通过Docker Run,用户可以基于指定的镜像创建一个容器实例,并且可以配置容器的各种参数,如网络设置、存储选项等。下面将详细介绍Docker …

QTDemo:串口调试工具

项目简介 本项目通过QT框架设计一款可以在Windows、Linux等平台的跨平台串口助手,串口功能能够满足基本的调试需求。 本项目采用的版本为:QT5.14 visual studio 2022 进行开发。 项目源码:https://github.com/say-Hai/MyCOMDemo 项目页面&am…

【K8S问题系列 | 21 】K8S中如果PV处于Bound状态,如何删除?【已解决】

在Kubernetes(K8S)的存储管理体系中,持久卷(PersistentVolume,PV)是一种重要的资源,它为Pod提供了持久化存储能力。当PV处于Bound状态时,意味着它已经与某个持久卷声明(P…

【行业发展报告】2024大数据与智能化行业发展浅析

回首 2024,大数据智能化浪潮汹涌。海量数据宛如繁星,在智能算法的苍穹下汇聚、碰撞,释放出洞察市场与用户的强大能量,精准勾勒出商业新航线。我们精心雕琢技术架构,从数据存储的坚固基石到处理分析的高效引擎&#xff…

Mumu模拟器12开启ADB调试方法

在使用安卓模拟器进行开发或调试时,ADB(Android Debug Bridge)是一项不可或缺的工具。大多数模拟器默认开启了ADB调试功能,但在安装最新版的 Mumu模拟器12 时,可能会遇到 adb devices 无法识别设备的问题。 问题描述 …

金融租赁系统的创新发展与市场竞争力提升探讨

内容概要 随着经济的快速发展,金融租赁系统逐渐成为金融市场中不可或缺的一环。它不仅提供了灵活的资金解决方案,还促进了企业的资本结构优化与资源配置效率。因此,了解该系统的市场背景与发展现状至关重要。 在现今环境下,新兴…

SQL 实战:基于经纬度的距离计算与位置查询

在位置服务(LBS)系统中,基于地理位置查询和距离计算是核心功能之一。例如: 查找附近的商铺、加油站或医院。计算两点之间的实际直线距离。筛选出指定范围内的用户或设备位置。 MySQL 提供了多种方式实现地理位置查询&#xff0c…

DAY1牛客题库1-3算法题:C语言版本(思路仅供参考)

挑战一下7天刷完牛客题库的108个题,今天是第一天思密达~一直以来都特别懒的做题,还是得勤奋点我觉得~今天只做了3个~嘻嘻明天去玩回家多弄几个~ 1.输出字符串最后一个单词长度 【1】题目: #include"stdio.h" #include"string…

LeetCode 83 :删除排链表中的重复元素

题目: 地址:https://leetcode.cn/problems/remove-duplicates-from-sorted-list/ 方法一: 方法二: package com.zy.leetcode.LeetCode_04;/*** Author: zy* Date: 2024-12-25-15:19* Description: 删除排链表中的里复元素* …

微信流量主挑战:用户破16!新增文档转换(新纪元3)

朋友们,报告好消息!我的小程序用户数量已经涨到16个了!没错,真没拉朋友圈亲戚好友来撑场子,全靠实力(和一点点运气)吸引了16位陌生小伙伴光临!这波进步,连我自己都感动了…

Java-38 深入浅出 Spring - AOP切面增强 核心概念 相关术语 Proxy配置

点一下关注吧!!!非常感谢!!持续更新!!! 大数据篇正在更新!https://blog.csdn.net/w776341482/category_12713819.html 目前已经更新到了: MyBatis&#xff…

基于Docker+模拟器的Appium自动化测试(二)

模拟器的设置 打开“夜神模拟器”的系统设置,切换到“手机与网络”页,选中网络设置下的“开启网络连接”和“开启网络桥接模式”复选框,而后选择“静态IP”单选框,在IP地址中输入“192.168.0.105”,网关等内容不再赘述…

【从零开始入门unity游戏开发之——C#篇36】C#的out协变和in逆变如何解决泛型委托的类型转换问题

文章目录 一、知识回顾和问题分析1、回顾强制转换和as转换知识2、问题分析 二、为什么泛型委托不行?1、泛型类型的严格类型检查2、**as 和强制类型转换不能直接使用** 三、如何解决这个问题?1、**协变(out)**2、**逆变&#xff08…

深度学习使用Anaconda打开Jupyter Notebook编码

新手入门深度学习使用Anaconda打开Jupyter Notebook编码 1. 安装Anaconda 第一种是Anaconda官网下载安装包,但是很慢,不太建议 第二种使用国内清华大学镜像源下载 选择适合自己电脑的版本,支持windows,linux系统 下载完之后自行…

Linux套接字通信学习

Linux套接字通信 代码源码:https://github.com/say-Hai/TcpSocketLearn/tree/CThreadSocket 在网络通信的时候, 程序猿需要负责的应用层数据的处理(最上层),而底层的数据封装与解封装(如TCP/IP协议栈的功能)通常由操作系统、网络协…

git clone 和 conda 换源

文章目录 git clone 通过 sshconda 创建虚拟环境通过 env.yml 文件conda 换源 git clone 通过 ssh git clone ssh://用户名IP地址:/仓库名字.gitconda 创建虚拟环境通过 env.yml 文件 conda env create -f environment.ymlconda 换源 Step 1 生成 .bashrc 文件在家目录下。…

机床数据采集网关在某机械制造企业的应用

随着工业4.0时代的到来,智能制造已成为制造业转型升级的重要方向。数控机床作为现代制造业的核心设备,其运行状态和加工参数的数据实时采集与分析对于提升生产效率、优化生产流程具有关键意义。 背景概述 某机械制造企业拥有多台数控机床,这…

c# RSA加解密工具,.netRSA加解密工具

软件介绍 名称: c# RSA加解密工具,.netRSA加解密工具依赖.net版本: .net 8.0工具类型: WinForm源码下载 c# RSA加解密工具,.netRSA加解密工具 依赖项 WinFormsRSA.csproj <Project

穷举vs暴搜vs深搜vs回溯vs剪枝_全排列_子集

46. 全排列 递归解决&#xff1a;一开始选一个数&#xff0c;递归进入下一层再选一个新的数&#xff0c;直到到最后一个数。反会上一层遍历其它数。 每次递归到叶子节点就找到了一种组合&#xff0c;思路有了具体怎么实现&#xff1f; 1.怎么记录每条路径&#xff1f; 定义一个…

【Trick】获取kaggle账号的token和api(用于数据集下载)

0&#xff1a;操作背景 由于未来的科研需要用到Unet&#xff0c;但是运行学长的史山代码无法跑通&#xff0c;自己写了一个Unet并load学长的数据集效果也很差&#xff0c;于是打算从最最基础的开始&#xff0c;上github调用一个Unet并成功在公有数据集上跑一遍实例。 Unet的g…