- 🍨 本文为🔗365天深度学习训练营 中的学习记录博客
- 🍦 参考文章:TensorFlow入门实战|第3周:天气识别
- 🍖 原作者:K同学啊|接辅导、项目定制
一、多头注意力机制
import torch
import torch.nn as nn
class MultiHeadAttention(nn.Module):
def __init__(self, hid_dim, n_heads):
super(MultiHeadAttention, self).__init__()
assert hid_dim % n_heads == 0
self.hid_dim = hid_dim
self.n_heads = n_heads
self.w_q = nn.Linear(hid_dim, hid_dim)
self.w_k = nn.Linear(hid_dim, hid_dim)
self.w_v = nn.Linear(hid_dim, hid_dim)
self.fc = nn.Linear(hid_dim, hid_dim)
self.scale = torch.sqrt(torch.FloatTensor([hid_dim // n_heads]))
def forward(self, query, key, value, mask=None):
bsz = query.shape[0]
Q = self.w_q(query)
K = self.w_k(key)
V = self.w_v(value)
Q = Q.view(bsz, -1, self.n_heads, self.hid_dim // self.n_heads).permute(0, 2, 1, 3)
K = K.view(bsz, -1, self.n_heads, self.hid_dim // self.n_heads).permute(0, 2, 1, 3)
V = V.view(bsz, -1, self.n_heads, self.hid_dim // self.n_heads).permute(0, 2, 1, 3)
energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale
if mask is not None:
energy = energy.masked_fill(mask == 0, -1e10)
attention = torch.softmax(energy, dim=-1)
x = torch.matmul(attention, V)
x = x.permute(0, 2, 1, 3).contiguous()
x = x.view(bsz, -1, self.n_heads * (self.hid_dim // self.n_heads))
x = self.fc(x)
return x
二、前馈神经网络
class Feedforward(nn.Module):
def __init__(self, d_model, d_ff, dropout=0.1):
super(Feedforward, self).__init__()
# 两层线性变换和ReLU激活函数
self.linear1 = nn.Linear(d_model, d_ff)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(d_ff, d_model)
def forward(self, x):
x = torch.nn.functional.relu(self.linear1(x))
x = self.dropout(x)
x = self.linear2(x)
return x
三、位置编码
class PositionalEncoding(nn.Module):
"实现位置编码"
def __init__(self, d_model, dropout, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
# 初始化一个Shape为(max_len, d_model)的位置编码矩阵
pe = torch.zeros(max_len, d_model).to(device)
# 初始化一个tensor [[0, 1, 2, 3, ...]]
position = torch.arange(0, max_len).unsqueeze(1)
# 这里计算sin和cos中的频率项,通过e的指数进行变换
div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term) # 偶数位置填充sin
pe[:, 1::2] = torch.cos(position * div_term) # 奇数位置填充cos
pe = pe.unsqueeze(0) # 为了方便批处理,增加一个可以unsqueeze出一个`batch`
# 将pe注册为一个不参与梯度反传,但又希望保存在model的state_dict中的持久化buffer
# 这个操作使得pe可以在forward中使用,但是不会被视为模型的一个可训练参数
self.register_buffer('pe', pe)
def forward(self, x):
"""
将embedding后的inputs,例如(1, 7, 128),batch size为1, 7个单词,每个单词的嵌入为128
"""
# 将x与positional encoding相加。
x = x + self.pe[:, :x.size(1)].requires_grad_(False)
return self.dropout(x)
四、编码层
class EncoderLayer(nn.Module):
def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
super(EncoderLayer, self).__init__()
# 自注意力层和前馈神经网络层初始化及残差连接和层归一化
self.self_attn = MultiHeadAttention(d_model, n_heads, dropout)
self.feedforward = Feedforward(d_model, d_ff, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
# 自注意力机制
attn_output = self.self_attn(x, x, x, mask)
x = x + self.dropout(attn_output)
x = self.norm1(x)
# 前馈神经网络
ff_output = self.feedforward(x)
x = x + self.dropout(ff_output)
x = self.norm2(x)
return x
五、解码层
class DecoderLayer(nn.Module):
def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
super(DecoderLayer, self).__init__()
# 自注意力层和前馈神经网络层初始化及残差连接和层归一化
self.self_attn = MultiHeadAttention(d_model, n_heads, dropout)
self.enc_attn = MultiHeadAttention(d_model, n_heads, dropout)
self.feedforward = Feedforward(d_model, d_ff, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, enc_output, self_mask, context_mask):
# 自注意力机制
attn_output = self.self_attn(x, x, x, self_mask)
x = x + self.dropout(attn_output)
x = self.norm1(x)
# 编码器-解码器注意力机制
attn_output = self.enc_attn(x, enc_output, enc_output, context_mask)
x = x + self.dropout(attn_output)
x = self.norm2(x)
# 前馈神经网络
ff_output = self.feedforward(x)
x = x + self.dropout(ff_output)
x = self.norm3(x)
return x
六、Transformer模型
这段代码是构建Transformer模型的核心,通常用于机器翻译、文本摘要、问答系统等自然语言处理任务。
class Transformer(nn.Module):
def __init__(self, vocab_size, d_model, n_heads, n_encoder_layers, n_decoder_layers, d_ff, dropout=0.1):
super(Transformer, self).__init__()
# Transformer模型包含嵌入层、位置编码、编码器和解码器层以及输出层
self.embedding = nn.Embedding(vocab_size, d_model)
self.positional_encoding = PositionalEncoding(d_model, dropout)
self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_encoder_layers)])
self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_decoder_layers)])
self.fc_out = nn.Linear(d_model, vocab_size)
self.dropout = nn.Dropout(dropout)
def forward(self, src, trg, src_mask, trg_mask):
# 嵌入层和位置编码
src = self.embedding(src)
src = self.positional_encoding(src)
trg = self.embedding(trg)
trg = self.positional_encoding(trg)
# 编码器层
for layer in self.encoder_layers:
src = layer(src, src_mask)
# 解码器层
for layer in self.decoder_layers:
trg = layer(trg, src, trg_mask, src_mask)
# 输出层
output = self.fc_out(trg)
return output
self.embedding
: 嵌入层,用于将输入词汇转换成固定大小的向量。self.positional_encoding
: 位置编码层,给嵌入向量添加位置信息。self.encoder_layers
: 编码器层,是一个模块列表,其中包含了多个EncoderLayer
。self.decoder_layers
: 解码器层,是一个模块列表,其中包含了多个DecoderLayer
。self.fc_out
: 线性层,将解码器输出转换为最终的词汇大小输出。self.dropout
: 丢弃层,用于训练中的正则化。
七、初始化模型
使用PyTorch框架初始化一个Transformer模型,并为模型的源和目标序列生成随机数据以及对应的掩码。
# 模型示例
vocab_size = 10000 # 假设词汇表大小为10000
d_model = 512
n_heads = 8
n_encoder_layers = 6
n_decoder_layers = 6
d_ff = 2048
dropout = 0.1
transformer_model = Transformer(vocab_size, d_model, n_heads, n_encoder_layers, n_decoder_layers, d_ff, dropout)
# 生成输入,这里的输入是随机的,需要根据实际情况修改
src = torch.randint(0, vocab_size, (32, 10)) # 假设源序列长度为10
trg = torch.randint(0, vocab_size, (32, 20)) # 假设目标序列长度为20
# 生成掩码,用于屏蔽序列中的填充位置
src_mask = (src != 0).unsqueeze(1).unsqueeze(2) # 源序列掩码
trg_mask = (trg != 0).unsqueeze(1).unsqueeze(2) # 目标序列掩码
# 模型前向传播
output = transformer_model(src, trg, src_mask, trg_mask)
print(output.shape)
代码通过模型进行前向传播,并打印输出的形状。根据Transformer模型的设计,输出的形状应该是(batch_size, trg_seq_len, vocab_size)
,在这个例子中是(32, 20, 10000)
。这表示对于每个批次中的32个样本的每个位置,模型都会输出一个10000维的向量,向量表示每个词汇的分数或概率。
完整代码如下:
import torch
import torch.nn as nn
import math
# 指定设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class MultiHeadAttention(nn.Module):
def __init__(self, hid_dim, n_heads):
super(MultiHeadAttention, self).__init__()
assert hid_dim % n_heads == 0
self.hid_dim = hid_dim
self.n_heads = n_heads
self.w_q = nn.Linear(hid_dim, hid_dim)
self.w_k = nn.Linear(hid_dim, hid_dim)
self.w_v = nn.Linear(hid_dim, hid_dim)
self.fc = nn.Linear(hid_dim, hid_dim)
self.scale = torch.sqrt(torch.FloatTensor([hid_dim // n_heads]))
def forward(self, query, key, value, mask=None):
bsz = query.shape[0]
Q = self.w_q(query)
K = self.w_k(key)
V = self.w_v(value)
Q = Q.view(bsz, -1, self.n_heads, self.hid_dim // self.n_heads).permute(0, 2, 1, 3)
K = K.view(bsz, -1, self.n_heads, self.hid_dim // self.n_heads).permute(0, 2, 1, 3)
V = V.view(bsz, -1, self.n_heads, self.hid_dim // self.n_heads).permute(0, 2, 1, 3)
energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale
if mask is not None:
energy = energy.masked_fill(mask == 0, -1e10)
attention = torch.softmax(energy, dim=-1)
x = torch.matmul(attention, V)
x = x.permute(0, 2, 1, 3).contiguous()
x = x.view(bsz, -1, self.n_heads * (self.hid_dim // self.n_heads))
x = self.fc(x)
return x
class Feedforward(nn.Module):
def __init__(self, d_model, d_ff, dropout=0.1):
super(Feedforward, self).__init__()
# 两层线性变换和ReLU激活函数
self.linear1 = nn.Linear(d_model, d_ff)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(d_ff, d_model)
def forward(self, x):
x = torch.nn.functional.relu(self.linear1(x))
x = self.dropout(x)
x = self.linear2(x)
return x
class PositionalEncoding(nn.Module):
"实现位置编码"
def __init__(self, d_model, dropout, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
# 初始化一个Shape为(max_len, d_model)的位置编码矩阵
pe = torch.zeros(max_len, d_model).to(device)
# 初始化一个tensor [[0, 1, 2, 3, ...]]
position = torch.arange(0, max_len).unsqueeze(1)
# 这里计算sin和cos中的频率项,通过e的指数进行变换
div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term) # 偶数位置填充sin
pe[:, 1::2] = torch.cos(position * div_term) # 奇数位置填充cos
pe = pe.unsqueeze(0) # 为了方便批处理,增加一个可以unsqueeze出一个`batch`
# 将pe注册为一个不参与梯度反传,但又希望保存在model的state_dict中的持久化buffer
# 这个操作使得pe可以在forward中使用,但是不会被视为模型的一个可训练参数
self.register_buffer('pe', pe)
def forward(self, x):
"""
将embedding后的inputs,例如(1, 7, 128),batch size为1, 7个单词,每个单词的嵌入为128
"""
# 将x与positional encoding相加。
x = x + self.pe[:, :x.size(1)].requires_grad_(False)
return self.dropout(x)
class EncoderLayer(nn.Module):
def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
super(EncoderLayer, self).__init__()
# 自注意力层和前馈神经网络层初始化及残差连接和层归一化
self.self_attn = MultiHeadAttention(d_model, n_heads)
self.feedforward = Feedforward(d_model, d_ff, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
# 自注意力机制
attn_output = self.self_attn(x, x, x, mask)
x = x + self.dropout(attn_output)
x = self.norm1(x)
# 前馈神经网络
ff_output = self.feedforward(x)
x = x + self.dropout(ff_output)
x = self.norm2(x)
return x
class DecoderLayer(nn.Module):
def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
super(DecoderLayer, self).__init__()
# 自注意力层和前馈神经网络层初始化及残差连接和层归一化
self.self_attn = MultiHeadAttention(d_model, n_heads )
self.enc_attn = MultiHeadAttention(d_model, n_heads )
self.feedforward = Feedforward(d_model, d_ff, dropout)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, enc_output, self_mask, context_mask):
# 自注意力机制
attn_output = self.self_attn(x, x, x, self_mask)
x = x + self.dropout(attn_output)
x = self.norm1(x)
# 编码器-解码器注意力机制
attn_output = self.enc_attn(x, enc_output, enc_output, context_mask)
x = x + self.dropout(attn_output)
x = self.norm2(x)
# 前馈神经网络
ff_output = self.feedforward(x)
x = x + self.dropout(ff_output)
x = self.norm3(x)
return x
class Transformer(nn.Module):
def __init__(self, vocab_size, d_model, n_heads, n_encoder_layers, n_decoder_layers, d_ff, dropout=0.1):
super(Transformer, self).__init__()
# Transformer模型包含嵌入层、位置编码、编码器和解码器层以及输出层
self.embedding = nn.Embedding(vocab_size, d_model)
self.positional_encoding = PositionalEncoding(d_model, dropout)
self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_encoder_layers)])
self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_decoder_layers)])
self.fc_out = nn.Linear(d_model, vocab_size)
self.dropout = nn.Dropout(dropout)
def forward(self, src, trg, src_mask, trg_mask):
# 嵌入层和位置编码
src = self.embedding(src)
src = self.positional_encoding(src)
trg = self.embedding(trg)
trg = self.positional_encoding(trg)
# 编码器层
for layer in self.encoder_layers:
src = layer(src, src_mask)
# 解码器层
for layer in self.decoder_layers:
trg = layer(trg, src, trg_mask, src_mask)
# 输出层
output = self.fc_out(trg)
return output
# 模型示例
vocab_size = 10000 # 假设词汇表大小为10000
d_model = 512
n_heads = 8
n_encoder_layers = 6
n_decoder_layers = 6
d_ff = 2048
dropout = 0.1
transformer_model = Transformer(vocab_size, d_model, n_heads, n_encoder_layers, n_decoder_layers, d_ff, dropout)
# 生成输入,这里的输入是随机的,需要根据实际情况修改
src = torch.randint(0, vocab_size, (32, 10)) # 假设源序列长度为10
trg = torch.randint(0, vocab_size, (32, 20)) # 假设目标序列长度为20
# 生成掩码,用于屏蔽序列中的填充位置
src_mask = (src != 0).unsqueeze(1).unsqueeze(2) # 源序列掩码
trg_mask = (trg != 0).unsqueeze(1).unsqueeze(2) # 目标序列掩码
# 模型前向传播
output = transformer_model(src, trg, src_mask, trg_mask)
print(output.shape)