前言:
参考: https://www.datacamp.com/tutorial/building-a-transformer-with-py-torch,
使用PyTorch 构建Transformer 主要分为下面几步,
- 定义基本构建块——多头注意力、位置前馈网络、位置编码
- 构建编码器块
- 构建解码器块
- 结合编码器和解码器层来创建完整的Transformer网络
这里面使用了Transformer NAT方案,
李沫的代码里面使用的是AT方案.
目录:
- 定义基本构建块:多头注意力、位置前馈网络、位置编码
- 构建编码器模块
- 构建解码器模块
- 结合编码器和解码器层来创建完整的 Transformer 网络
- 训练 PyTorch Transformer 模型
- Transformer 模型性能评估
- 完整代码以及面试题
一 定义基本构建块
1.1 多头注意力机制
- d_model: 输入的维数。
- num_heads:将输入分割成的注意力头的数量
- 计算注意力得分:attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)。这里,注意力得分是通过对查询 (Q) 和键 (K) 进行点积计算得出的,然后按键维度 (d_k) 的平方根缩放。
- 应用掩码:如果提供了掩码,则将其应用于注意力分数以掩盖特定值。
- 计算注意力权重:注意力分数通过 softmax 函数将其转换为总和为 1 的概率。
- 计算输出:注意力的最终输出是通过将注意力权重乘以值(V)来计算的。
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention, self).__init__()
# Ensure that the model dimension (d_model) is divisible by the number of heads
assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
# Initialize dimensions
self.d_model = d_model # Model's dimension
self.num_heads = num_heads # Number of attention heads
self.d_k = d_model // num_heads # Dimension of each head's key, query, and value
# Linear layers for transforming inputs
self.W_q = nn.Linear(d_model, d_model) # Query transformation
self.W_k = nn.Linear(d_model, d_model) # Key transformation
self.W_v = nn.Linear(d_model, d_model) # Value transformation
self.W_o = nn.Linear(d_model, d_model) # Output transformation
def scaled_dot_product_attention(self, Q, K, V, mask=None):
# Calculate attention scores
attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
# Apply mask if provided (useful for preventing attention to certain parts like padding)
if mask is not None:
attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
# Softmax is applied to obtain attention probabilities
attn_probs = torch.softmax(attn_scores, dim=-1)
# Multiply by values to obtain the final output
output = torch.matmul(attn_probs, V)
return output
def split_heads(self, x):
# Reshape the input to have num_heads for multi-head attention
batch_size, seq_length, d_model = x.size()
return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
def combine_heads(self, x):
# Combine the multiple heads back to original shape
batch_size, _, seq_length, d_k = x.size()
return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
def forward(self, Q, K, V, mask=None):
# Apply linear transformations and split heads
Q = self.split_heads(self.W_q(Q))
K = self.split_heads(self.W_k(K))
V = self.split_heads(self.W_v(V))
# Perform scaled dot-product attention
attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
# Combine heads and apply output transformation
output = self.W_o(self.combine_heads(attn_output))
return output
1.2 位置前馈网络
- d_model:模型输入和输出的维数。
- d_ff:前馈网络内层的维数。
- self.fc1 和 self.fc2:两个完全连接(线性)层,其输入和输出维度由 d_model 和 d_ff 定义。
- self.relu:ReLU(整流线性单元)激活函数,在两个线性层之间引入非线性。
class PositionWiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff):
super(PositionWiseFeedForward, self).__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
self.relu = nn.ReLU()
def forward(self, x):
return self.fc2(self.relu(self.fc1(x)))
1.3 位置编码
位置编码用于注入输入序列中每个 token 的位置信息。它使用不同频率的正弦和余弦函数来生成位置编码。
- d_model:模型输入的维度。
- max_seq_length:预先计算位置编码的序列的最大长度。
- pe:一个用零填充的张量,它将用位置编码填充。
- 位置:包含序列中每个位置的位置索引的张量。
- div_term:用于以特定方式缩放位置索引的术语。
- 将正弦函数应用于偶数指标,将余弦函数应用于 pe 的奇数指标。
- 最后,pe 被注册为缓冲区,这意味着它将成为模块状态的一部分,但不会被视为可训练参数。
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_seq_length):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_seq_length, d_model)
position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe.unsqueeze(0))
def forward(self, x):
return x + self.pe[:, :x.size(1)]
总结
# -*- coding: utf-8 -*-
"""
Created on Wed Sep 4 21:48:26 2024
@author: cxf
"""
import torch
import torch.nn as nn
import math
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention,self).__init__()
#确保模型维度d_model能被头数heads整除
assert d_model%num_heads ==0
#initialize dimensions
self.d_model = d_model #Model's dimension
self.num_heads = num_heads #number of attention heads
self.d_k = d_model//num_heads
#Linear layers for transforming inputs
self.W_q = nn.Linear(d_model, d_model) #query transformation
self.W_k = nn.Linear(d_model, d_model) #key transformation
self.W_v = nn.Linear(d_model, d_model) #value transformation
self.W_o= nn.Linear(d_model, d_model) #output transformation
def scaled_dot_product_attention(self,Q, K,V, mask=None):
#calculate attention scores
attn_scores = torch.matmul(Q,K.transpose(-2,-1))/math.sqrt(self.d_k)
#Apply mask if provided (useful for preventing attention to certain parts like padding)
if mask is not None:
attn_scores = attn_scores.masked_fill(mask==0,-1e9)
attn_probs = torch.softmax(attn_scores,dim=-1)
# Multiply by values to obtain the final output
output = torch.matmul(attn_probs,V)
return output
def split_heads(self, x):
# Reshape the input to have num_heads for multi-head attention
batch_size, seq_length, d_model = x.size()
#output.shape =(batch_size,num_heads, seq_length,d_k)
output = x.view(batch_size,seq_length,self.num_heads,self.d_k).transpose(1,2)
return output
def combine_heads(self, x):
# Combine the multiple heads back to original shape
batch_size, nheads, seq_length, d_k = x.size()
output = x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
return output
def forward(self, Q, K, V, mask=None):
# Apply linear transformations and split heads
Q = self.split_heads(self.W_q(Q))
K = self.split_heads(self.W_k(K))
V = self.split_heads(self.W_v(V))
# Perform scaled dot-product attention
attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
# Combine heads and apply output transformation [batch_size, seq_length, d_model]
output = self.W_o(self.combine_heads(attn_output))
return output
class PositionWiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff):
super(PositionWiseFeedForward, self).__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
self.relu = nn.ReLU()
def forward(self, x):
hidden = self.relu(self.fc1(x))
output = self.fc2(hidden)
return output
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_seq_length):
super(PositionalEncoding,self).__init__()
pe = torch.zeros(max_seq_length, d_model)
position = torch.arange(0, max_seq_length).unsqueeze(0)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
#div_term =torch.pow(10000, torch.arange(0, d_model, 2)/d_model)
div_term =torch.pow(10000, torch.arange(0, d_model, 2)/d_model)
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe.unsqueeze(0))
def forward(self, x):
output = x+self.pe[:,:x.size(1)]
return output
if __name__ == "__main__":
d_model = 512
nheads = 8
batch_size = 2
seq_length = 10
net = MultiHeadAttention(d_model, nheads)
query = torch.rand(batch_size,seq_length,d_model)
key = torch.rand(batch_size,seq_length,d_model)
value = torch.rand(batch_size,seq_length,d_model)
out = net(query,key,value,None)
print(out.shape)
二 构建编码器模块
参数:
- d_model:输入的维数。
- num_heads:多头注意力机制中注意力头的数量。
- d_ff:位置前馈网络内层的维数。
- dropout:用于正则化的 dropout 率。
函数:
- self.self_attn:多头注意力机制。
- self.feed_forward:位置前馈神经网络。
- self.norm1 和 self.norm2:层规范化,用于平滑层的输入。
- self.dropout:Dropout 层,用于在训练期间随机将一些激活设置为零以防止过度拟合。
输入:
- x:编码器层的输入。
- mask:可选掩码,用于忽略输入的某些部分。
前向传播:
- 自注意力:输入 x 经过多头自注意力机制。
- Add & Normalize(Attention 之后):将 Attention 输出添加到原始输入(残差连接)中,然后进行 dropout 并使用 norm1 进行规范化。
- 前馈网络:上一步的输出通过位置前馈网络。
- 添加并标准化(前馈之后):与步骤 2 类似,将前馈输出添加到此阶段的输入(残差连接),然后进行 dropout 并使用 norm2 进行标准化。
- 输出:处理后的张量作为编码器层的输出返回。
class EncoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super(EncoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
attn_output = self.self_attn(x, x, x, mask)
x = self.norm1(x + self.dropout(attn_output))
ff_output = self.feed_forward(x)
x = self.norm2(x + self.dropout(ff_output))
return x
三 构建解码器模块
参数:
- d_model:输入的维数。
- num_heads:多头注意力机制中注意力头的数量。
- d_ff:前馈网络内层的维数。
- dropout:正则化的 dropout 率。
成分:
- self.self_attn:针对目标序列的多头自注意机制。
- self.cross_attn:关注编码器输出的多头注意力机制。
- self.feed_forward:位置前馈神经网络。
- self.norm1、self.norm2、self.norm3:层规范化组件。
- self.dropout:用于正则化的 Dropout 层。
class DecoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super(DecoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.cross_attn = MultiHeadAttention(d_model, num_heads)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, enc_output, src_mask, tgt_mask):
attn_output = self.self_attn(x, x, x, tgt_mask)
x = self.norm1(x + self.dropout(attn_output))
attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
x = self.norm2(x + self.dropout(attn_output))
ff_output = self.feed_forward(x)
x = self.norm3(x + self.dropout(ff_output))
return x
输入:
- x:解码器层的输入。
- enc_output:来自相应编码器的输出(用于交叉注意步骤)。
- src_mask:源掩码,用于忽略编码器输出的某些部分。
- tgt_mask:目标掩码忽略解码器输入的某些部分。
处理步骤:
- 目标序列上的自注意力:通过自注意力机制处理输入x。
- 添加并规范化(自注意力之后):将自注意力的输出添加到原始 x 中,然后进行 dropout 并使用 norm1 进行规范化。
- 与编码器输出的交叉注意:上一步的标准化输出通过交叉注意机制进行处理,该机制关注编码器的输出 enc_output。
- 添加并规范化(交叉注意之后):将交叉注意的输出添加到此阶段的输入中,然后进行 dropout 并使用 norm2 进行规范化。
- 前馈网络:上一步的输出通过前馈网络。
- 添加并标准化(前馈之后):将前馈输出添加到此阶段的输入,然后进行 dropout 并使用 norm3 进行标准化。
- 输出:处理后的张量作为解码器层的输出返回。
概括:
DecoderLayer 类定义 Transformer 解码器的单层。它由多头自注意力机制、多头交叉注意力机制(关注编码器的输出)、位置前馈神经网络以及相应的残差连接、层规范化和 dropout 层组成。这种组合使解码器能够根据编码器的表示生成有意义的输出,同时考虑目标序列和源序列。与编码器一样,多个解码器层通常堆叠在一起以形成 Transformer 模型的完整解码器部分。
接下来,将编码器和解码器模块组合在一起,构建完整的Transformer模型。
四 结合编码器和解码器层来创建完整的 Transformer 网络
构造函数采用以下参数:
- src_vocab_size:源词汇量。
- tgt_vocab_size:目标词汇量。
- d_model:模型嵌入的维数。
- num_heads:多头注意力机制中注意力头的数量。
- num_layers:编码器和解码器的层数。
- d_ff:前馈网络内层的维数。
- max_seq_length:位置编码的最大序列长度。
- dropout:正则化的 Dropout 率。
它定义了以下组件:
- self.encoder_embedding:源序列的嵌入层。
- self.decoder_embedding:目标序列的嵌入层。
- self.positional_encoding:位置编码组件。
- self.encoder_layers:编码器层列表。
- self.decoder_layers:解码器层列表。
- self.fc:最终完全连接(线性)层映射到目标词汇大小。
- self.dropout:Dropout层。
class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
super(Transformer, self).__init__()
self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
self.positional_encoding = PositionalEncoding(d_model, max_seq_length)
self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
self.fc = nn.Linear(d_model, tgt_vocab_size)
self.dropout = nn.Dropout(dropout)
def generate_mask(self, src, tgt):
src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
seq_length = tgt.size(1)
nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
tgt_mask = tgt_mask & nopeak_mask
return src_mask, tgt_mask
def forward(self, src, tgt):
src_mask, tgt_mask = self.generate_mask(src, tgt)
src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))
enc_output = src_embedded
for enc_layer in self.encoder_layers:
enc_output = enc_layer(enc_output, src_mask)
dec_output = tgt_embedded
for dec_layer in self.decoder_layers:
dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)
output = self.fc(dec_output)
return output
六 训练
6.1 样本数据准备
src_vocab_size = 5000
tgt_vocab_size = 5000
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_seq_length = 100
dropout = 0.1
transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)
# Generate random sample data
src_data = torch.randint(1, src_vocab_size, (64, max_seq_length)) # (batch_size, seq_length)
tgt_data = torch.randint(1, tgt_vocab_size, (64, max_seq_length)) # (batch_size, seq_length)
超参数
超参数:
这些值定义了变压器模型的架构和行为:
- src_vocab_size,tgt_vocab_size:源序列和目标序列的词汇大小,均设置为 5000。
- d_model:模型嵌入的维数,设置为 512。
- num_heads:多头注意力机制中注意力头的数量,设置为8。
- num_layers:编码器和解码器的层数,设置为 6。
- d_ff:前馈网络内层的维数,设置为 2048。
- max_seq_length:位置编码的最大序列长度,设置为 100。
- dropout:正则化的 Dropout 率,设置为 0.1。
生成随机样本数据:
以下行生成随机源和目标序列:
- src_data:1 到 src_vocab_size 之间的随机整数,表示一批形状为 (64, max_seq_length) 的源序列。
- tgt_data:1 到 tgt_vocab_size 之间的随机整数,代表一批形状为 (64, max_seq_length) 的目标序列。
- 这些随机序列可作为 Transformer 模型的输入,模拟一批包含 64 个示例和长度为 100 的序列的数据。
概括:
代码片段演示了如何初始化转换器模型并生成可输入到模型中的随机源序列和目标序列。所选的超参数决定了转换器的具体结构和属性。此设置可以是更大脚本的一部分,其中模型在实际的序列到序列任务(例如机器翻译或文本摘要)上进行训练和评估。
2 训练模型
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
transformer.train()
for epoch in range(100):
optimizer.zero_grad()
output = transformer(src_data, tgt_data[:, :-1])
loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_data[:, 1:].contiguous().view(-1))
loss.backward()
optimizer.step()
print(f"Epoch: {epoch+1}, Loss: {loss.item()}")
损失函数和优化器:
- criterion = nn.CrossEntropyLoss(ignore_index=0):将损失函数定义为交叉熵损失。ignore_index 参数设置为 0,这意味着损失不会考虑索引为 0 的目标(通常为填充标记保留)。
- optimizer = optim.Adam(...):将优化器定义为 Adam,学习率为 0.0001,并具有特定的 beta 值。
模型训练模式:
- transformer.train():将 transformer 模型设置为训练模式,启用仅在训练期间适用的行为(如 dropout)。
训练循环:
代码片段使用典型的训练循环对模型进行 100 次训练:
- for epoch in range(100):迭代 100 个训练时期。
- optimizer.zero_grad():清除上一次迭代的梯度。
- output = transformer(src_data, tgt_data[:, :-1]):将源数据和目标数据(不包括每个序列中的最后一个标记)传递到转换器。这在序列到序列任务中很常见,其中目标会移动一个标记。
- loss = criterion(...):计算模型预测与目标数据(不包括每个序列中的第一个标记)之间的损失。损失是通过将数据重塑为一维张量并使用交叉熵损失函数来计算的。
- loss.backward():计算相对于模型参数的损失的梯度。
- optimizer.step():使用计算的梯度更新模型的参数。
- print(f"Epoch: {epoch+1}, Loss: {loss.item()}"):打印当前 epoch 号和该 epoch 的损失值。
3 性能评估
transformer.eval()
# Generate random sample validation data
val_src_data = torch.randint(1, src_vocab_size, (64, max_seq_length)) # (batch_size, seq_length)
val_tgt_data = torch.randint(1, tgt_vocab_size, (64, max_seq_length)) # (batch_size, seq_length)
with torch.no_grad():
val_output = transformer(val_src_data, val_tgt_data[:, :-1])
val_loss = criterion(val_output.contiguous().view(-1, tgt_vocab_size), val_tgt_data[:, 1:].contiguous().view(-1))
print(f"Validation Loss: {val_loss.item()}")
评估模式:
- transformer.eval():将 transformer 模型置于评估模式。这很重要,因为它会关闭仅在训练期间使用的某些行为(如 dropout)。
生成随机验证数据:
- val_src_data:1 和 src_vocab_size 之间的随机整数,代表一批形状为 (64, max_seq_length) 的验证源序列。
- val_tgt_data:1 到 tgt_vocab_size 之间的随机整数,代表一批形状为 (64, max_seq_length) 的验证目标序列。
验证循环:
- 使用 torch.no_grad():禁用梯度计算,因为我们不需要在验证期间计算梯度。这可以减少内存消耗并加快计算速度。
- val_output = transformer(val_src_data, val_tgt_data[:, :-1]): 将验证源数据和验证目标数据(不包括每个序列中的最后一个标记)通过转换器。
- val_loss = criterion(...):计算模型预测与验证目标数据(不包括每个序列中的第一个标记)之间的损失。通过将数据重塑为一维张量并使用先前定义的交叉熵损失函数来计算损失。
- print(f"Validation Loss: {val_loss.item()}"):打印验证损失值。
概括:
此代码片段在随机生成的验证数据集上评估转换器模型,计算验证损失并打印出来。在实际场景中,随机验证数据应替换为您正在处理的任务中的实际验证数据。验证损失可以指示您的模型在看不见的数据上的表现如何,这是衡量模型泛化能力的关键指标。
七 面试题
"""
Created on Wed Sep 4 21:48:26 2024
@author: cxf
"""
import torch
import torch.nn as nn
import math
import torch.optim as optim
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention,self).__init__()
#确保模型维度d_model能被头数heads整除
assert d_model%num_heads ==0
#initialize dimensions
self.d_model = d_model #Model's dimension
self.num_heads = num_heads #number of attention heads
self.d_k = d_model//num_heads
#Linear layers for transforming inputs
self.W_q = nn.Linear(d_model, d_model) #query transformation
self.W_k = nn.Linear(d_model, d_model) #key transformation
self.W_v = nn.Linear(d_model, d_model) #value transformation
self.W_o= nn.Linear(d_model, d_model) #output transformation
def scaled_dot_product_attention(self,Q, K,V, mask=None):
#calculate attention scores[batch_size,num_heads,seq_length=100,99,d_k]
attn_scores = torch.matmul(Q,K.transpose(-2,-1))/math.sqrt(self.d_k)
#Apply mask if provided (useful for preventing attention to certain parts like padding)
if mask is not None:
attn_scores = attn_scores.masked_fill(mask==0,-1e9)
attn_probs = torch.softmax(attn_scores,dim=-1)
# Multiply by values to obtain the final output
output = torch.matmul(attn_probs,V)
return output
def split_heads(self, x):
# Reshape the input to have num_heads for multi-head attention
batch_size, seq_length, d_model = x.size()
#output.shape =(batch_size,num_heads, seq_length,d_k)
output = x.view(batch_size,seq_length,self.num_heads,self.d_k).transpose(1,2)
return output
def combine_heads(self, x):
# Combine the multiple heads back to original shape
batch_size, nheads, seq_length, d_k = x.size()
output = x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
return output
def forward(self, Q, K, V, mask=None):
# Apply linear transformations and split heads
Q = self.split_heads(self.W_q(Q))
K = self.split_heads(self.W_k(K))
V = self.split_heads(self.W_v(V))
# Perform scaled dot-product attention
attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
# Combine heads and apply output transformation [batch_size, seq_length, d_model]
output = self.W_o(self.combine_heads(attn_output))
return output
class PositionWiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff):
super(PositionWiseFeedForward, self).__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
self.relu = nn.ReLU()
def forward(self, x):
hidden = self.relu(self.fc1(x))
output = self.fc2(hidden)
return output
class PositionalEncoding(nn.Module):
def __init__(self, d_model,max_seq_length):
super(PositionalEncoding,self).__init__()
pe = torch.zeros(max_seq_length, d_model)
position = torch.arange(0, max_seq_length).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
#div_term =torch.pow(10000, torch.arange(0, d_model, 2)/d_model)
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe.unsqueeze(0))
def forward(self, x):
output = x+self.pe[:,:x.size(1)]
return output
class AddNorm(nn.Module):
def __init__(self, d_model=512,dropout=0.1):
super(AddNorm,self).__init__()
self.norm = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x,y):
output = x+ self.dropout(y)
output = self.norm(output)
return output
class EncoderLayer(nn.Module):
def __init__(self, d_model=512, num_heads=8, d_ff=2048,dropout=0):
super(EncoderLayer,self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.addnorm1 = AddNorm(d_model,dropout)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
self.addnorm2 = AddNorm(d_model,dropout)
def forward(self, x, mask):
attn_output = self.self_attn(x,x,x,mask)
x = self.addnorm1(x,attn_output)
ffn_output = self.feed_forward(x)
x = self.addnorm2(x,ffn_output)
return x
class DecoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super(DecoderLayer,self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.addnorm1 = AddNorm(d_model,dropout)
self.cross_attn = MultiHeadAttention(d_model, num_heads)
self.addnorm2 = AddNorm(d_model,dropout)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
self.addnorm3 = AddNorm(d_model,dropout)
def forward(self,x, enc_output, encoder_mask, decoder_mask):
attn_output = self.self_attn(x,x,x,decoder_mask)
x = self.addnorm1(x, attn_output)
attn_output = self.cross_attn(x, enc_output,enc_output,encoder_mask)
x = self.addnorm2(x, attn_output)
ffn_output = self.feed_forward(x)
x = self.addnorm3(x,ffn_output)
return x
class Transformer(nn.Module):
def __init__(self, src_vocab_size=1000, tgt_vocab_size=1000,d_model=512,num_heads=8,num_layers=6,d_ff=2048,max_seq_length=100,dropout=0.1):
super(Transformer, self).__init__()
self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
self.positional_encoding = PositionalEncoding(d_model, max_seq_length)
self.encoder_layers = nn.ModuleList([EncoderLayer(d_model,num_heads,d_ff,dropout) for _ in range(num_layers)])
self.decoder_layers = nn.ModuleList([DecoderLayer(d_model,num_heads,d_ff,dropout) for _ in range(num_layers)])
self.fc = nn.Linear(d_model, tgt_vocab_size)
self.dropout = nn.Dropout(dropout)
def generate_mask(self, src, tgt):
src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
print("\n src_mask",src_mask.shape, "\t tgt_mask",tgt_mask.shape)
batch_size,seq_length = tgt.shape
nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
tgt_mask = tgt_mask & nopeak_mask
return src_mask, tgt_mask
def forward(self, src, tgt):
src_mask,tgt_mask = self.generate_mask(src,tgt)
src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))
enc_output = src_embedded
for enc_layer in self.encoder_layers:
enc_output = enc_layer(enc_output, src_mask)
dec_output = tgt_embedded
for dec_layer in self.decoder_layers:
dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)
output = self.fc(dec_output)
return output
def eval(transformer,criterion):
#验证
transformer.eval()
src_vocab_size = 1000
max_seq_length = 100
tgt_vocab_size = 1000
# Generate random sample validation data
val_src_data = torch.randint(1, src_vocab_size, (64, max_seq_length)) # (batch_size, seq_length)
val_tgt_data = torch.randint(1, tgt_vocab_size, (64, max_seq_length)) # (batch_size, seq_length)
with torch.no_grad():
val_output = transformer(val_src_data, val_tgt_data[:, :-1])
val_loss = criterion(val_output.contiguous().view(-1, tgt_vocab_size), val_tgt_data[:, 1:].contiguous().view(-1))
print(f"Validation Loss: {val_loss.item()}")
def train():
#训练
src_vocab_size = 5000
tgt_vocab_size = 5000
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_seq_length = 100
dropout = 0.1
batch_size = 64
dropout = 0.1
maxIter = 100
transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)
# Generate random sample data
src_data = torch.randint(1, src_vocab_size, (batch_size, max_seq_length)) # (batch_size, seq_length)
tgt_data = torch.randint(1, tgt_vocab_size, (batch_size, max_seq_length)) # (batch_size, seq_length)
print(src_data.shape)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
transformer.train()
for epoch in range(maxIter):
optimizer.zero_grad()
output = transformer(src_data, tgt_data[:, :-1])
inputs = output.contiguous().view(-1, tgt_vocab_size)
targets = tgt_data[:, 1:].contiguous().view(-1)
print("\n shape ",inputs.shape, targets.shape,output.shape,tgt_data.shape,tgt_data[:, :-1].shape)
loss = criterion(inputs,targets)
loss.backward()
optimizer.step()
print(f"Epoch: {epoch+1}, Loss: {loss.item()}")
if __name__ == "__main__":
train()
常见面试题_transformer面试题-CSDN博客