OpenAIGPTModel源码解析
- 1. GPT 介绍
- 2. OpenAIGPTModel类 源码解析
说到ChatGPT,大家可能都使用过吧。2022年,ChatGPT的推出引发了广泛的关注和讨论。这款对话生成模型不仅具备了强大的语言理解和生成能力,还能进行非常自然的对话,给用户带来了全新的互动体验。然而,ChatGPT的成功背后离不开它的前身——GPT。
1. GPT 介绍
GPT(Generative Pre-trained Transformer)是由OpenAI开发的一种基于Transformer架构的大型语言模型。它由多个堆叠的自注意力解码器层(Transformer Blocks)组成,每一层包含多头自注意力机制和前馈神经网络,并配有残差连接和层归一化以稳定训练。GPT采用自回归方式生成文本,通过在大规模互联网数据上进行预训练,具备强大的自然语言理解和生成能力,能够完成对话生成、文本补全等多种任务。其结构如下:
2. OpenAIGPTModel类 源码解析
源码地址:transformers/src/transformers/models/openai/modeling_openai.py
# -*- coding: utf-8 -*-
# @time: 2024/9/3 20:39
from typing import Optional, Union, Tuple
import torch
from torch import nn
from transformers import add_start_docstrings, OpenAIGPTPreTrainedModel
from transformers.modeling_outputs import BaseModelOutput
from transformers.models.openai.modeling_openai import OPENAI_GPT_START_DOCSTRING, Block, OPENAI_GPT_INPUTS_DOCSTRING, _CHECKPOINT_FOR_DOC, _CONFIG_FOR_DOC
from transformers.utils import add_start_docstrings_to_model_forward, add_code_sample_docstrings
@add_start_docstrings(
"The bare OpenAI GPT transformer model outputting raw hidden-states without any specific head on top.",
OPENAI_GPT_START_DOCSTRING,
)
class OpenAIGPTModel(OpenAIGPTPreTrainedModel):
def __init__(self, config):
super().__init__(config)
self.tokens_embed = nn.Embedding(config.vocab_size, config.n_embd) # 定义 token 嵌入层
self.positions_embed = nn.Embedding(config.n_positions, config.n_embd) # 定义 position 嵌入层
self.drop = nn.Dropout(config.embd_pdrop) # 定义 drop 层
self.h = nn.ModuleList([Block(config.n_positions, config, scale=True) for _ in range(config.n_layer)]) # 定义多个 Block 层
# 注册一个缓冲区用于存储position_ids,初始化为从 0 到 config.n_positions 的序列
self.register_buffer("position_ids", torch.arange(config.n_positions), persistent=False)
# Initialize weights and apply final processing
self.post_init()
def get_input_embeddings(self):
return self.tokens_embed
def set_input_embeddings(self, new_embeddings):
self.tokens_embed = new_embeddings
def _prune_heads(self, heads_to_prune):
"""
Prunes heads of the model. heads_to_prune: dict of {layer_num: list of heads to prune in this layer}
"""
# 剪掉模型多头注意力机制中的一些头,heads_to_prune 是一个字典,键为layer_num,值为需要剪枝的 heads 列表。
for layer, heads in heads_to_prune.items():
self.h[layer].attn.prune_heads(heads)
@add_start_docstrings_to_model_forward(OPENAI_GPT_INPUTS_DOCSTRING)
@add_code_sample_docstrings(
checkpoint=_CHECKPOINT_FOR_DOC,
output_type=BaseModelOutput,
config_class=_CONFIG_FOR_DOC,
)
def forward(
self,
input_ids: Optional[torch.LongTensor] = None,
attention_mask: Optional[torch.FloatTensor] = None,
token_type_ids: Optional[torch.LongTensor] = None,
position_ids: Optional[torch.LongTensor] = None,
head_mask: Optional[torch.FloatTensor] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple[torch.Tensor], BaseModelOutput]:
# 根据 config 配置设定 output_attentions, output_hidden_states, return_dict 的值
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# 获取 input_ids 或者 inputs_embeds 以及 input_shape
if input_ids is not None and inputs_embeds is not None: # 当 input_ids 和 inputs_embeds 同时存在时,抛出错误
raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time")
elif input_ids is not None: # 如果存在 input_ids,将其形状调整为 (batch_size, sequence_length)
self.warn_if_padding_and_no_attention_mask(input_ids, attention_mask)
input_shape = input_ids.size()
input_ids = input_ids.view(-1, input_shape[-1])
elif inputs_embeds is not None: # 如果存在 inputs_embeds,获取其形状
input_shape = inputs_embeds.size()[:-1]
else: # 如果 input_ids 和 inputs_embeds 都不存在,抛出错误
raise ValueError("You have to specify either input_ids or inputs_embeds")
# 如果没有传入 position_ids,则生成默认的 position_ids
if position_ids is None:
# Code is different from when we had a single embedding matrix from position and token embeddings
position_ids = self.position_ids[None, : input_shape[-1]]
# ------------------------------------- 1. 获取 attention_mask -----------------------------#
# Attention mask.
if attention_mask is not None:
# We create a 3D attention mask from a 2D tensor mask.
# Sizes are [batch_size, 1, 1, to_seq_length]
# So we can broadcast to [batch_size, num_heads, from_seq_length, to_seq_length]
# this attention mask is more simple than the triangular masking of causal attention
# used in OpenAI GPT, we just need to prepare the broadcast dimension here.
attention_mask = attention_mask.unsqueeze(1).unsqueeze(2) # 将 2D 掩码扩展为 3D 掩码,适用于批量输入
# Since attention_mask is 1.0 for positions we want to attend and 0.0 for
# masked positions, this operation will create a tensor which is 0.0 for
# positions we want to attend and the dtype's smallest value for masked positions.
# Since we are adding it to the raw scores before the softmax, this is
# effectively the same as removing these entirely.
# 将注意力掩码转换为与模型参数相同的数据类型,并进行数值变换,torch.finfo(self.dtype).min 返回数据类型的最小值。
attention_mask = attention_mask.to(dtype=next(self.parameters()).dtype) # fp16 compatibility
attention_mask = (1.0 - attention_mask) * torch.finfo(self.dtype).min
# ----------------------------------------------------------------------------------------#
# ------------------------------------- 2. 获取 head_mask ---------------------------------#
# Prepare head mask if needed
head_mask = self.get_head_mask(head_mask, self.config.n_layer)
# ---------------------------------------------------------- -----------------------------#
# ------------------------------------- 3. 获取 hidden_states -----------------------------#
# 如果 inputs_embeds 为 None,则使用 tokens_embed 对 input_ids 计算
if inputs_embeds is None:
inputs_embeds = self.tokens_embed(input_ids)
# 计算 position_embeds
position_embeds = self.positions_embed(position_ids)
# 如果存在 token_type_ids,使用 tokens_embed 计算;否则 token_type_embeds 为 0
if token_type_ids is not None:
token_type_ids = token_type_ids.view(-1, token_type_ids.size(-1))
token_type_embeds = self.tokens_embed(token_type_ids)
else:
token_type_embeds = 0
# 计算 hidden_states,即inputs_embeds、position_embeds 和 token_type_embeds 之和,并使用 dropout
hidden_states = inputs_embeds + position_embeds + token_type_embeds
hidden_states = self.drop(hidden_states)
# -------------------------------------------------------------------------------------#
# 获取输出形状,以及初始化输出结果 all_attentions 和 all_hidden_states
output_shape = input_shape + (hidden_states.size(-1),)
all_attentions = () if output_attentions else None
all_hidden_states = () if output_hidden_states else None
# -----------------------------------4. Block逐层计算处理(核心部分)--------------------#
for i, block in enumerate(self.h):
# 如果需要输出 hidden states,将当前 hidden_states 添加到 all_hidden_states
if output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states,)
# 通过当前 Block 处理 hidden_states,得到新的 hidden_states 和 attentions
outputs = block(hidden_states, attention_mask, head_mask[i], output_attentions=output_attentions)
hidden_states = outputs[0]
# 如果需要输出 attentions,将当前 attentions 添加到 all_attentions
if output_attentions:
all_attentions = all_attentions + (outputs[1],)
# ---------------------------------------------------------------------------------#
# 将 hidden_states 的形状调整为输出形状
hidden_states = hidden_states.view(*output_shape)
# 如果需要输出 hidden states,将最后的 hidden_states 添加到 all_hidden_states
if output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states,)
# -----------------------------------5. 根据配置的输出方式输出结果-------------------------------#
if not return_dict:
return tuple(v for v in [hidden_states, all_hidden_states, all_attentions] if v is not None)
return BaseModelOutput(
last_hidden_state=hidden_states,
hidden_states=all_hidden_states,
attentions=all_attentions,
)