什么是数据并行?
数据并行(Data Parallelism, DP)的核心思想是将大规模的数据集分割成若干个较小的数据子集,并将这些子集分配到不同的 NPU 计算节点上,每个节点运行相同的模型副本,但处理不同的数据子集。
1. 将数据区分为不同的mini-batch
将数据集切分为若干子集,每个mini-batch又不同的设备独立处理。如你有4个GPU,可以把数据集分为4分,每个GPU处理一个字数据集。
2. 模型参数同步
可以通过某一进程初始化全部模型参数后,向其他进程广播模型参数,实现同步。
3. 前向运算
每个设备独立计算前向运算。
4. 反向运算
每个设备计算损失的梯度。
5. 梯度聚合
当所有设备计算完各自的梯度后,对所有设备的梯度取平均,每个设备的模型参数根据平均梯度进行更新。
6. 参数更新
数据并行的参数更新是在数据切分、模型参数同步后进行的。更新前,每个进程的参数相同;更新时,平均梯度相同;故更新后,每个进程的参数也相同。
数据并行的基本操作
Reduce 归约
归约是函数式编程的概念。数据归约包括通过函数将一组数字归约为较小的一组数字。
如sum([1, 2, 3, 4, 5])=15, multiply([1, 2, 3, 4, 5])=120。
AllReduce
等效于执行Reduce操作后,将结果广播分配给所有进程。
MindSpore AllReduce
import numpy as np
from mindspore.communication import init
from mindspore.communication.comm_func import all_reduce
from mindspore import Tensor
init()
input_tensor = Tensor(np.ones([2, 8]).astype(np.float32))
output = all_reduce(input_tensor)
数据并行的主要计算思想
Parameter-Server
主要思想:所有node被分为server node和worker node
Server node:负责参数的存储和全局的聚合操作
Worker node:负责计算
Parameter-Server的问题:
- 假设有N=5张卡,GPU0作为Server,其余作为Worker
- 将大小为K的数据拆分为N-1份,分给每个Worker GPU
- 每个GPU计算得到local gradients
- N-1块GPU将计算所得的local gradients发送给GPU 0
- GPU 0对所有local gradients进行all reduce操作得到全局梯度,参数更新
- 将该新模型的参数返回给每张GPU
假设单个Worker到Server的通信开销为C,那么将local gradients送到GPU 0上的通信成本为C * (N - 1)。收到GPU 0通信带宽的影响,通信成本随着设备数的增加而线性增长。
Pytorch DataParallel
Pytorch DP在Parameter-Server的基础上,把GPU 0即当作Server也当作Worker。
1. 切分数据,但不切分Label
每个GPU进行正向计算之后,将正向计算结果聚合回GPU 0计算Loss,GPU 0计算完Loss的gradient之后,将梯度分发回其他worker GPU。随后各个GPU计算整个模型的grad,再将grad聚合回GPU 0,进行AllReduce。
2. 切分数据,同时切分Label
每张卡自己计算Loss即可,减少一次聚合操作。
Pytorch DataParallel 问题:
1. 为摆脱Parameter-Server模式,性能差。
2. 需要额外的GPU进行梯度聚合/ GPU 0需要额外的显存。GPU 0限制了其他GPU的上限。
Ring AllReduce
每张卡单向通讯,通讯开销一定。
每张卡占用的显存相同。
第一步:Scatter-Reduce
假设每张卡上各自计算好了梯度。
每张GPU依次传值:
重复直至:
第二步:All-Gather
将每一个累计值a / b / c逐个发送至个张卡
直至每张卡都有每层的梯度累计值。
两步分别做了四次通讯,便可以实现并行计算。
Ring AllReduce计算开销
- N-1次Scatter-Reduce
- N-1次All-Gather
- 每个GPUGPU一次通讯量为:K/N,K为总数据大小
- 每个GPU通信次数为:2(N-1)
总通信量为:
当N足够多时,通信量为一个常数2K。
Gradient Bucketing
集合通信在大张量上更有效。因此,可以在短时间内等待并将多个梯度存储到一个数据桶(Bucket),然后进行AllReduce操作。而不是对每个梯度立刻启动AllReduce操作。
MindSpore数据并行
def forward_fn(data, target):
logits = net(data)
loss = loss_fn(logits, target)
return loss, logits
grad_fn = ms.value_and_grad(forward_fn, None, net_trainable_param(), has_aux=True)
# 初始化reducer
grad_gather = nn.DistributedGradReducer(optimizer.parameters)
for epoch in range(10):
i = 0
for image, label in data_set:
(loss_value, _), grads = grad_fn(image, label)
# 进行通讯
grads = grad_reducer(grads)
optimizer(grads)
# ...
MindNLP数据并行
def update_gradient_by_distributed_type(self, model: nn.Module) -> None:
'''update gradient by distributed_type'''
if accelerate_distributed_type == DistributedType.NO:
return
if accelerate_distributed_type == DistrivutedType.MULTI_NPU:
from mindspore.communication import get_group_size
from mindspore.communication.comm_func iport all_reduce
rank_size = get_group_size()
for parameter in model.parameters():
# 进行all_reduce
new_grads_mean = all_reduce(parameter.grad) / rank_size
parameter.grad = new_grads_mean
数据并行的局限性
要求单卡可以放下模型
多卡训练时内存冗余,相同模型参数复制了多份。
MindSopre中的数据并行
1. 在启智社区创建云脑任务或华为云创建notebook
环境选择:mindspore==2.3.0, cann==8.0,昇腾910 * 2
2. 更新MindSpore框架版本
pip install --upgrade mindspore
同时可以查看NPU信息:
npu--smi info
3. 配置项目环境
克隆mindnlp项目
git clone https://github.com/mindspore-lab/mindnlp.git
下载mindnlp
cd mindnlp
bash scripts/build_and_reinstall.sh
下载完成后,卸载mindformers、soundfile
pip uninstall mindformers
4. 运行训练脚本
cd mindnlp/llm/parallel/bert_imdb_finetune_dp
msrun --worker_num=2 --local_worker_num=2 --master_port=8118 bert_imdb_finetune_cpu_mindnlp_trainer_npus_same.py
发现两个NPU都被占用
日志文件开始记录模型训练进度
成功实现数据并行!
基于MindSpore微调Roberta+数据并行
数据集:imdb影评数据集
微调代码:roberta.py
#!/usr/bin/env python
# coding: utf-8
"""
unset MULTI_NPU && python bert_imdb_finetune_cpu_mindnlp_trainer_npus_same.py
bash bert_imdb_finetune_npu_mindnlp_trainer.sh
"""
import mindspore.dataset as ds
from mindnlp.dataset import load_dataset
# loading dataset
imdb_ds = load_dataset('imdb', split=['train', 'test'])
imdb_train = imdb_ds['train']
imdb_test = imdb_ds['test']
imdb_train.get_dataset_size()
import numpy as np
def process_dataset(dataset, tokenizer, max_seq_len=512, batch_size=4, shuffle=False):
is_ascend = mindspore.get_context('device_target') == 'Ascend'
def tokenize(text):
if is_ascend:
tokenized = tokenizer(text, padding='max_length', truncation=True, max_length=max_seq_len)
else:
tokenized = tokenizer(text, truncation=True, max_length=max_seq_len)
return tokenized['input_ids'], tokenized['attention_mask']
if shuffle:
dataset = dataset.shuffle(batch_size)
# map dataset
dataset = dataset.map(operations=[tokenize], input_columns="text", output_columns=['input_ids', 'attention_mask'])
dataset = dataset.map(operations=transforms.TypeCast(mindspore.int32), input_columns="label", output_columns="labels")
# batch dataset
if is_ascend:
dataset = dataset.batch(batch_size)
else:
dataset = dataset.padded_batch(batch_size, pad_info={'input_ids': (None, tokenizer.pad_token_id),
'attention_mask': (None, 0)})
return dataset
from mindnlp.transformers import AutoTokenizer
import mindspore
import mindspore.dataset.transforms as transforms
# tokenizer
tokenizer = AutoTokenizer.from_pretrained('roberta-base')
dataset_train = process_dataset(yelp_ds_train, tokenizer, shuffle=True)
from mindnlp.transformers import AutoModelForSequenceClassification
# set bert config and define parameters for training
model = AutoModelForSequenceClassification.from_pretrained('AI-ModelScope/roberta-base', num_labels=2, mirror='modelscope')
from mindnlp.engine import TrainingArguments
training_args = TrainingArguments(
output_dir="./",
save_strategy="epoch",
logging_strategy="epoch",
num_train_epochs=3,
learning_rate=2e-5
)
training_args = training_args.set_optimizer(name="adamw", beta1=0.8)
from mindnlp.engine import Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=dataset_train
)
print('start training')
trainer.train()
运行命令:
msrun --worker_num=2 --local_worker_num=2 --master_port=8118 roberta.py