源码地址:https://gitee.com/guojialiang2023/gpt2
GPT2
- 模型
- Configuration
- 类定义
- Recorder
- 训练框架
- Trainer
- 训练代码
- `GPT2TrainingSpec` 类
- `train_gpt2_model` 函数
- `add_subparser` 函数
模型
Configuration
这段代码定义了一个名为 TrainConfig
的 Python 类,它主要用于配置和管理机器学习或深度学习训练过程中的相关参数。下面我将详细解释这个类的各个部分:
类定义
参数列表
batch_train: int
- 训练批量大小,指定在训练过程中每个批次处理的数据数量。
batch_eval: int
- 评估批量大小,指定在模型评估过程中每个批次处理的数据数量。
total_steps: int
- 总步数,指训练过程中的总迭代次数。
eval_steps: int
- 评估步数,指定每多少步进行一次模型评估。
save_steps: int
- 保存步数,指定每多少步保存一次模型。
save_model_path: str
- 模型保存路径,指定训练好的模型保存的位置。
save_checkpoint_path: str
- 检查点保存路径,用于保存训练过程中的中间状态,以便于恢复训练或进行故障恢复。
description: str
- 描述信息,用于记录这个训练配置的描述或备注。
log_format: str
- 日志格式,指定训练过程中日志的输出格式。
use_amp: bool
- 是否使用自动混合精度(Automatic Mixed Precision),这是一种优化训练过程的技术,可以提高性能同时减少内存使用。
gpus: int
- GPU数量,指定用于训练的GPU数量。
方法 distributed
@property
- 这是一个装饰器,用于将下面的方法定义为一个属性。
def distributed(self) -> bool:
- 这是一个名为
distributed
的属性,用来判断训练是否应该在多个 GPU 上分布执行。 - 如果
gpus
参数大于 1,表示有多个 GPU 可用,因此返回True
,表示训练是分布式的。否则返回False
。
- 这是一个名为
综上所述,这个 TrainConfig
类提供了一个结构化的方式来配置和管理机器学习训练过程中的关键参数,从而使训练过程更加清晰、可控。通过这种方式,用户可以方便地调整训练参数,以适应不同的训练需求和硬件环境。
class TrainConfig(object):
def __init__(self,
batch_train: int,
batch_eval: int,
total_steps: int,
eval_steps: int,
save_steps: int,
save_model_path: str,
save_checkpoint_path: str,
description: str,
log_format: str,
use_amp: bool,
gpus: int):
self.batch_train = batch_train
self.batch_eval = batch_eval
self.total_steps = total_steps
self.eval_steps = eval_steps
self.save_steps = save_steps
self.save_model_path = save_model_path
self.save_checkpoint_path = save_checkpoint_path
self.description = description
self.log_format = log_format
self.use_amp = use_amp
self.gpus = gpus
@property
def distributed(self) -> bool:
return self.gpus is not None and self.gpus > 1
Recorder
代码定义了一个名为 Recorder
的类,其目的是为了记录和处理度量指标(metrics)。类的结构和功能可以分为以下几个部分:
-
初始化 (
__init__
方法)self.metrics
:用于存储每一步累计度量指标的历史值。self.batch_metrics
:用于临时存储一批度量指标的值,在每次调用stamp
方法时,这些值将被处理并转移到self.metrics
。
-
记录度量指标 (
record
方法)- 参数
metrics
:一个字典,其中包含要记录的度量指标及其值。 - 参数
scope
:可选参数,用于给度量指标名称添加前缀,以区分不同的度量范围。 - 方法功能:此方法遍历
metrics
字典中的每个度量指标,并将其值添加到self.batch_metrics
。如果scope
不为空,则在度量指标名称前加上前缀。
- 参数
-
时间戳记录 (
stamp
方法)- 参数
step
:代表当前的步骤或时间点。 - 方法功能:此方法处理
self.batch_metrics
中的度量指标,计算每个指标的平均值,并将这些平均值与相应的步骤号一起存储到self.metrics
中。完成后,清空self.batch_metrics
以便于下一批度量指标的记录。
- 参数
-
格式化输出 (
format
方法)- 参数
fstring
:一个格式化字符串,用于定义输出格式。 - 方法功能:此方法将
self.metrics
中的最新度量指标值替换到fstring
中相应的占位符上。为了匹配格式化字符串中的占位符,度量指标名称的斜线(/
)被替换成下划线(_
)。
- 参数
from typing import Dict, Optional
class Recorder(object):
def __init__(self):
self.metrics = {}
self.batch_metrics = {}
def record(self, metrics: Dict[str, float], scope: Optional[str] = None):
for name, value in metrics.items():
name = f'{scope}/{name}' if scope else name
if name not in self.batch_metrics:
self.batch_metrics[name] = []
self.batch_metrics[name].append(value)
def stamp(self, step: int = 0):
for name, values in self.batch_metrics.items():
if name not in self.metrics:
self.metrics[name] = []
# Add the average of metrics values in the batch.
self.metrics[name].append((step, sum(values) / len(values)))
self.batch_metrics.clear()
def format(self, fstring: str) -> str:
return fstring.format(**{
k.replace('/', '_'): v[-1][1] for k, v in self.metrics.items()})
训练框架
代码定义了一个名为 TrainingSpec
的类,它提供了一个结构化的方式来定义和实现一个训练过程中的各个关键组件。以下是各个方法的详细说明:
-
初始化 (
initialize
方法)- 方法功能:这是一个初始化方法,用于执行任何必要的初始化任务。当前它没有执行任何操作,但可以在子类中被重写以进行特定的初始化工作。
-
准备数据集 (
prepare_datasets
方法)- 返回值:返回一个包含两个
Dataset
对象的元组,通常这两个对象分别代表训练集和验证集。 - 方法功能:这是一个抽象方法,意味着在子类中必须实现它。它负责加载或准备训练和验证数据集。
- 返回值:返回一个包含两个
-
构建模型 (
construct_model
方法)- 返回值:返回一个 PyTorch 的
nn.Module
对象,即神经网络模型。 - 方法功能:这同样是一个抽象方法,需要在子类中定义具体的模型结构。
- 返回值:返回一个 PyTorch 的
-
创建优化器 (
create_optimizer
方法)- 参数
params
:一个神经网络参数的迭代器。 - 返回值:返回一个包含优化器 (
optim.Optimizer
) 和学习率调度器 (optim.lr_scheduler._LRScheduler
) 的元组。 - 方法功能:这个方法也是抽象的,需要在子类中实现。它负责创建用于训练模型的优化器和学习率调度器。
- 参数
-
训练目标 (
train_objective
方法)- 参数
data
:一个包含输入数据的字典。 - 参数
model
:当前的模型对象。 - 返回值:返回一个包含训练过程中计算的度量指标的字典。
- 方法功能:这是另一个抽象方法,用于定义训练过程中的损失计算和任何额外的度量指标。
- 参数
-
评估目标 (
eval_objective
方法)- 参数
data
:一个包含输入数据的字典。 - 参数
model
:当前的模型对象。 - 返回值:返回一个包含评估过程中计算的度量指标的字典。
- 方法功能:这个方法与
train_objective
类似,但用于评估或验证过程。
- 参数
TrainingSpec
类提供了一个框架,允许用户通过继承该类并实现这些方法来定义特定于他们任务的训练和评估过程。这种方法使得代码更加模块化和可重用,同时也提供了一种结构化的方式来组织训练流程。在实际应用中,用户需要根据具体的应用场景来实现这些方法。
import torch
import torch.nn as nn
import torch.optim as optim
from data import Dataset
from typing import Tuple, Iterator, Dict
class TrainingSpec(object):
def initialize(self):
pass
def prepare_datasets(self) -> Tuple[Dataset, Dataset]:
raise NotImplementedError()
def construct_model(self) -> nn.Module:
raise NotImplementedError()
def create_optimizer(self, params: Iterator[nn.Parameter]
) -> Tuple[optim.Optimizer,
optim.lr_scheduler._LRScheduler]:
raise NotImplementedError()
def train_objective(self, data: Dict[str, torch.Tensor], model: nn.Module
) -> Dict[str, torch.Tensor]:
raise NotImplementedError()
def eval_objective(self, data: Dict[str, torch.Tensor], model: nn.Module
) -> Dict[str, torch.Tensor]:
raise NotImplementedError()
Trainer
代码定义了一个名为 Trainer
的类,用于处理训练流程中的各个环节,包括分布式训练、自动混合精度训练(AMP)、数据加载、模型训练和评估、以及保存模型和训练状态。以下是该类的主要组成部分和功能:
-
初始化 (
__init__
方法)- 参数
spec
:一个TrainingSpec
对象,包含了训练过程中所需的所有规范和方法。 - 参数
config
:一个TrainConfig
对象,包含了训练的配置信息,例如是否使用分布式训练、是否使用AMP、批大小等。
- 参数
-
训练方法 (
train
方法)- 参数
from_checkpoint
和from_pretrained
:用于指定从哪个检查点或预训练模型开始训练。 - 功能:根据配置选择是进行单机训练还是分布式训练。在分布式训练中,它使用
mp.spawn
方法来启动多个进程。
- 参数
-
内部训练方法 (
_train
方法)- 包含训练的主要逻辑:设置分布式环境、初始化模型、加载数据集、创建优化器和学习率调度器、训练循环、评估循环、保存检查点等。
- 在每个训练步骤和评估步骤中,记录性能指标,并在必要时保存模型和训练状态。
-
训练步骤 (
_train_step
方法)- 实现单个训练步骤的逻辑,包括获取数据、计算损失、反向传播以及优化器和调度器的更新。
-
评估步骤 (
_eval_step
方法)- 实现单个评估步骤的逻辑,用于在验证集上评估模型性能。
-
数据获取方法 (
_fetch_from
方法)- 用于从数据集中获取数据,考虑到分布式训练中每个进程只处理一部分数据。
-
转换为标量值 (
_to_value
方法)- 将 PyTorch 张量转换为 Python 浮点数。在分布式训练中,它还包括跨不同进程的张量归约。
import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
import torch.multiprocessing as mp
from data import Dataset
from train import TrainingSpec, TrainConfig, Recorder
from typing import Dict, Optional
try:
from apex import amp
except ModuleNotFoundError:
pass
import warnings
warnings.filterwarnings(action='ignore')
class Trainer(object):
def __init__(self, spec: TrainingSpec, config: TrainConfig):
self.spec = spec
self.config = config
def train(self,
from_checkpoint: Optional[str] = None,
from_pretrained: Optional[str] = None):
if self.config.distributed:
mp.spawn(self._train, args=(from_checkpoint, from_pretrained),
nprocs=self.config.gpus)
else:
self._train(0, from_checkpoint, from_pretrained)
def _train(self,
rank: int,
from_checkpoint: Optional[str] = None,
from_pretrained: Optional[str] = None):
if self.config.distributed:
torch.cuda.set_device(rank)
dist.init_process_group(backend='nccl',
init_method='tcp://127.0.0.1:8000',
world_size=self.config.gpus,
rank=rank)
# Initialize training environment and prepare datasets.
self.spec.initialize()
train_dataset, eval_dataset = self.spec.prepare_datasets()
# Construct a model and load its pretrained weights.
model = self.spec.construct_model().cuda()
if from_pretrained:
ckpt = torch.load(from_pretrained, map_location='cuda')
model.load_state_dict(ckpt['model'])
# Because the weights data allocates quite a lot of GPU memories,
# we need to free the memories explicitly.
del ckpt
torch.cuda.empty_cache()
# Create an optimizer and learning rate scheduler.
optimizer, scheduler = self.spec.create_optimizer(model.parameters())
recorder = Recorder()
if self.config.use_amp:
model, optimizer = amp.initialize(
model, optimizer, opt_level='O2', verbosity=0)
if self.config.distributed:
model = nn.parallel.DistributedDataParallel(
model, device_ids=[rank])
start_step = 0
# Restore last training states from checkpoint.
if from_checkpoint:
ckpt = torch.load(from_checkpoint, map_location='cuda')
start_step = ckpt['step']
recorder = ckpt['recorder']
model.load_state_dict(ckpt['model'])
optimizer.load_state_dict(ckpt['optimizer'])
scheduler.load_state_dict(ckpt['scheduler'])
train_dataset.assign(ckpt['train_dataset'])
eval_dataset.assign(ckpt['eval_dataset'])
if self.config.use_amp:
amp.load_state_dict(ckpt['amp'])
# Because the checkpoint data allocates quite a lot of GPU
# memories, we need to free the memories explicitly.
del ckpt
torch.cuda.empty_cache()
if rank == 0:
# Create tqdm iterator in master process to show the progress of
# training.
training_iters = tqdm.tqdm(
range(start_step + 1, self.config.total_steps),
total=self.config.total_steps,
desc=self.config.description,
dynamic_ncols=True)
training_iters.update(start_step + 1)
else:
# In other processes, use simple iterator rather than tqdm one.
training_iters = range(start_step + 1, self.config.total_steps)
for step in training_iters:
# Clear CUDA cache which is used for training.
torch.cuda.empty_cache()
recorder.record(
self._train_step(rank, train_dataset, model, optimizer,
scheduler),
scope='train')
# Clear CUDA cache which is used for evaluation.
torch.cuda.empty_cache()
if (step + 1) % self.config.eval_steps == 0:
recorder.record(
self._eval_step(rank, eval_dataset, model), scope='eval')
recorder.stamp(step)
if rank == 0:
training_iters.set_postfix_str(
recorder.format(self.config.log_format))
# Save training states to checkpoint file.
if rank == 0 and (step + 1) % self.config.save_steps == 0:
ckpt = {'step': step,
'recorder': recorder,
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'scheduler': scheduler.state_dict(),
'train_dataset': train_dataset.where(),
'eval_dataset': eval_dataset.where()}
if self.config.use_amp:
ckpt['amp'] = amp.state_dict()
torch.save(ckpt, self.config.save_checkpoint_path)
# Because the checkpoint data allocates quite a lot of GPU
# memories, we need to free the memories explicitly.
del ckpt
torch.cuda.empty_cache()
# Since the model is wrapped with `DistributedDataParallel` class in
# distributed training environment, the original model can be accessed
# by `module` attribute.
if self.config.distributed:
model = model.module
# Save trained model weights and metrics recorded during the training.
if rank == 0:
torch.save({'model': model.cpu().state_dict(),
'metrics': recorder.metrics},
self.config.save_model_path)
def _train_step(self,
rank: int,
dataset: Dataset,
model: nn.Module,
optimizer: optim.Optimizer,
scheduler: optim.lr_scheduler._LRScheduler
) -> Dict[str, float]:
model.train()
optimizer.zero_grad()
data = self._fetch_from(dataset, rank, self.config.batch_train)
metrics = self.spec.train_objective(data, model)
loss = metrics['loss']
if self.config.use_amp:
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
else:
loss.backward()
optimizer.step()
scheduler.step()
return {k: self._to_value(v) for k, v in metrics.items()}
@torch.no_grad()
def _eval_step(self, rank: int, dataset: Dataset, model: nn.Module
) -> Dict[str, float]:
model.eval()
data = self._fetch_from(dataset, rank, self.config.batch_eval)
metrics = self.spec.eval_objective(data, model)
return {k: self._to_value(v) for k, v in metrics.items()}
def _fetch_from(self, dataset: Dataset, rank: int, batch: int
) -> Dict[str, torch.Tensor]:
if self.config.distributed:
# In distributed training environment, each process must ignore
# sub-batches of other processes and fetch corresponding one only.
batch = batch // self.config.gpus
dataset.skip(rank * batch)
data = dataset.fetch(batch)
dataset.skip((self.config.gpus - rank - 1) * batch)
else:
data = dataset.fetch(self.config.batch_train)
return {k: v.cuda() for k, v in data.items()}
def _to_value(self, tensor: torch.Tensor) -> float:
if self.config.distributed:
tensor = tensor.clone()
dist.all_reduce(tensor, op=dist.reduce_op.SUM)
return (tensor / self.config.gpus).item()
else:
return tensor.item()
训练代码
代码定义了一个用于训练 GPT-2 模型的程序,其中包含了用于训练的 GPT2TrainingSpec
类和 train_gpt2_model
函数,以及命令行参数解析的 add_subparser
函数。以下是代码的主要组成部分和功能:
GPT2TrainingSpec
类
GPT2TrainingSpec
是 TrainingSpec
的一个子类,专门用于训练 GPT-2 模型。它重写了 TrainingSpec
类的几个关键方法,以适应 GPT-2 模型的特定需求。具体来说:
__init__
方法:初始化训练规范,包括各种模型配置和训练设置。initialize
方法:初始化词汇表和损失函数。prepare_datasets
方法:准备训练和评估数据集。construct_model
方法:构建 GPT-2 模型。create_optimizer
方法:创建优化器和学习率调度器。train_objective
和eval_objective
方法:定义训练和评估目标。
train_gpt2_model
函数
这个函数是训练 GPT-2 模型的入口点。它接受一个命令行参数对象 argparse.Namespace
,并根据这些参数设置 GPT-2 训练的规范和配置。这个函数首先创建 GPT2TrainingSpec
和 TrainConfig
实例,然后使用这些实例创建一个 Trainer
对象来执行训练过程。
add_subparser
函数
这个函数用于添加命令行参数解析器,定义了用于训练 GPT-2 模型的所有必要命令行参数。它允许用户指定诸如模型配置、训练和评估批次大小、学习率、权重衰减率、训练步数、评估和保存模型的步数间隔、以及是否使用自动混合精度和梯度检查点等选项。
import argparse
import torch
import torch.nn as nn
import torch.optim as optim
from model import Transformer
from data import Dataset, Vocab, TokenizedCorpus
from train import TrainConfig, TrainingSpec, Trainer
from typing import Tuple, Iterator, Dict
try:
from apex.optimizers import FusedAdam as Adam
from apex.normalization import FusedLayerNorm as LayerNorm
except ModuleNotFoundError:
from torch.optim import AdamW as Adam
from torch.nn import LayerNorm
class GPT2TrainingSpec(TrainingSpec):
def __init__(self, train_corpus: str, eval_corpus: str, vocab_path: str,
seq_len: int, layers: int, heads: int, dims: int, rate: int,
dropout: float, base_lr: float, wd_rate: float,
total_steps: int, use_grad_ckpt: bool):
self.train_corpus = train_corpus
self.eval_corpus = eval_corpus
self.vocab_path = vocab_path
self.seq_len = seq_len
self.layers = layers
self.heads = heads
self.dims = dims
self.rate = rate
self.dropout = dropout
self.base_lr = base_lr
self.wd_rate = wd_rate
self.total_steps = total_steps
self.use_grad_ckpt = use_grad_ckpt
def initialize(self):
self.vocab = Vocab(vocab_path=self.vocab_path)
self.criterion = nn.CrossEntropyLoss(ignore_index=self.vocab.pad_idx,
reduction='mean')
def prepare_datasets(self) -> Tuple[Dataset, Dataset]:
train_dataset = TokenizedCorpus(corpus_path=self.train_corpus,
vocab=self.vocab,
seq_len=self.seq_len)
eval_dataset = TokenizedCorpus(corpus_path=self.eval_corpus,
vocab=self.vocab,
seq_len=self.seq_len)
return train_dataset, eval_dataset
def construct_model(self) -> nn.Module:
return Transformer(layers=self.layers, pad_idx=self.vocab.pad_idx,
words=len(self.vocab), seq_len=self.seq_len,
heads=self.heads, dims=self.dims, rate=self.rate,
dropout=self.dropout, bidirectional=False)
def create_optimizer(self, params: Iterator[nn.Parameter]
) -> Tuple[optim.Optimizer,
optim.lr_scheduler._LRScheduler]:
optimizer = Adam(
params, lr=self.base_lr, weight_decay=self.wd_rate)
scheduler = optim.lr_scheduler.LambdaLR(
optimizer, lambda step: 1 - step / self.total_steps)
return optimizer, scheduler
def train_objective(self, data: Dict[str, torch.Tensor], model: nn.Module
) -> Dict[str, torch.Tensor]:
logits = model(data['input'], use_grad_ckpt=self.use_grad_ckpt)
loss = self.criterion(logits.transpose(1, 2), data['output'])
return {'loss': loss}
def eval_objective(self, data: Dict[str, torch.Tensor], model: nn.Module
) -> Dict[str, torch.Tensor]:
logits, _ = model(data['input'], past=None)
loss = self.criterion(logits.transpose(1, 2), data['output'])
return {'loss': loss}
def train_gpt2_model(args: argparse.Namespace):
spec = GPT2TrainingSpec(
train_corpus=args.train_corpus, eval_corpus=args.eval_corpus,
vocab_path=args.vocab_path, seq_len=args.seq_len, layers=args.layers,
heads=args.heads, dims=args.dims, rate=args.rate, dropout=args.dropout,
base_lr=args.base_lr, wd_rate=args.wd_rate,
total_steps=args.total_steps, use_grad_ckpt=args.use_grad_ckpt)
config = TrainConfig(
batch_train=args.batch_train, batch_eval=args.batch_eval,
total_steps=args.total_steps, eval_steps=args.eval_steps,
save_steps=args.save_steps, save_model_path=args.save_model_path,
save_checkpoint_path=args.save_checkpoint_path,
description='Train GPT-2 model',
log_format='train/loss: {train_loss:.4f}, eval/loss: {eval_loss:.4f}',
use_amp=args.use_amp, gpus=args.gpus)
Trainer(spec, config).train(from_checkpoint=args.from_checkpoint,
from_pretrained=args.from_pretrained)
def add_subparser(subparsers: argparse._SubParsersAction):
parser = subparsers.add_parser('train', help='train GPT-2 model')
group = parser.add_argument_group('Corpus and vocabulary')
group.add_argument('--train_corpus', required=True,
help='training corpus file path')
group.add_argument('--eval_corpus', required=True,
help='evaluation corpus file path')
group.add_argument('--vocab_path', required=True,
help='vocabulary file path')
group = parser.add_argument_group('Model configurations')
group.add_argument('--seq_len', default=64, type=int,
help='maximum sequence length')
group.add_argument('--layers', default=12, type=int,
help='number of transformer layers')
group.add_argument('--heads', default=16, type=int,
help='number of multi-heads in attention layer')
group.add_argument('--dims', default=1024, type=int,
help='dimension of representation in each layer')
group.add_argument('--rate', default=4, type=int,
help='increase rate of dimensionality in bottleneck')
group.add_argument('--dropout', default=0.1, type=float,
help='probability that each element is dropped')
group = parser.add_argument_group('Training and evaluation')
group.add_argument('--batch_train', default=64, type=int,
help='number of training batch size')
group.add_argument('--batch_eval', default=64, type=int,
help='number of evaluation batch size')
group.add_argument('--base_lr', default=1e-4, type=float,
help='default learning rate')
group.add_argument('--wd_rate', default=1e-2, type=float,
help='weight decay rate')
group.add_argument('--total_steps', default=1000000, type=int,
help='number of total training steps')
group.add_argument('--eval_steps', default=500, type=int,
help='period to evaluate model and record metrics')
group.add_argument('--save_steps', default=1000, type=int,
help='period to save training state to checkpoint')
group = parser.add_argument_group('Saving and restoring')
group.add_argument('--save_model_path', default='model.pth',
help='save trained model weights to the file')
group.add_argument('--save_checkpoint_path', default='checkpoint.pth',
help='save training state to the checkpoint file')
group.add_argument('--from_checkpoint', default=None,
help='load last training state from checkpoint file')
group.add_argument('--from_pretrained', default=None,
help='initialize parameters from pretrained model')
group = parser.add_argument_group('Extensions')
group.add_argument('--use_amp', action='store_true',
help='use automatic mixed-precision in training')
group.add_argument('--use_grad_ckpt', action='store_true',
help='use gradient checkpointing in transformer layers')
group.add_argument('--gpus', default=None, type=int,
help='number of gpu devices to use in training')
parser.set_defaults(func=train_gpt2_model)