代码来源
本次代码来源自github https://github.com/graykode/nlp-tutorial
里面的5.1 transformer代码
第一步 数据准备(从main函数开始)
首先这里是自定义了三句话,分别是给到encoder的输入和decoder的输入还有测试的输入
sentences = ['ich mochte ein bier P', 'S i want a beer', 'i want a beer E']
然后把这些通过词向量编号变成,对应的向量。
#对应的词向量编号
src_vocab = {'P': 0, 'ich': 1, 'mochte': 2, 'ein': 3, 'bier': 4}
src_vocab_size = len(src_vocab)
tgt_vocab = {'P': 0, 'i': 1, 'want': 2, 'a': 3, 'beer': 4, 'S': 5, 'E': 6}
number_dict = {i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)
src_len = 5 # 我们训练的时候的源序列长度
tgt_len = 5 # 测试集语句的长度
d_model = 512 # Embedding Size 词嵌入的维度 可以理解为一个词有512个维度 它被分解成了具有一个512维度特征的向量 更浅显易懂的说就是把“我”这个词和512种不同的词作比较从而得到向量
d_ff = 2048 # 这个先不管 是前馈神经网络所需要提高的维度
d_k = d_v = 64 # 特征k v 的特征维度
n_layers = 6 # Encoder 和 Decoder 的数量 也就是大N
n_heads = 8 # 把qkv所需要分的头数 但是要注意 n_heads * d_k = d_model 为什么?因为输入的d_model被划分成8个子空间(也就是头数)这意味着每个头输出的维度是64计算完成后拼接起来
经过make_batch函数我们得到encoder和decoder的向量
enc_inputs, dec_inputs, target_batch = make_batch(sentences)
那他们是什么形状的呢?如下图 看到一句话是[1,5]也就是
make_batch函数
其实很简单 这个函数跑就是把词对应的编号一 一和我们的句子对应起来 比如S对应5. 以此类推
#用于将句子转换为批量数据
def make_batch(sentences):
# 将输入句子转换为词汇表的索引
input_batch = [[src_vocab[n] for n in sentences[0].split()]]
# 将输出句子转换为词汇表的索引
output_batch = [[tgt_vocab[n] for n in sentences[1].split()]]
# 将目标句子转换为词汇表的索引
target_batch = [[tgt_vocab[n] for n in sentences[2].split()]]
# 返回批量数据
return torch.LongTensor(input_batch), torch.LongTensor(output_batch), torch.LongTensor(target_batch)
那么接下来就来到重要的模型定义和训练
model = Transformer()# Transformer模型
criterion = nn.CrossEntropyLoss()#交叉熵损失函数
optimizer = optim.Adam(model.parameters(), lr=0.001)# 优化器
for epoch in range(20):
optimizer.zero_grad()
outputs, enc_self_attns, dec_self_attns, dec_enc_attns = model(enc_inputs, dec_inputs)
loss = criterion(outputs, target_batch.contiguous().view(-1))
print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
loss.backward()
optimizer.step()
Transformer 主体函数定义代码
怎么说呢?主体代码就是根据transformer上面的图一样的,不会的可以看我上一篇博客。开始是encoder层然后是decoder层,最后用一个线性层降维得到我们所需要的结果
class Transformer(nn.Module):
# 初始化函数
def __init__(self):
# 调用父类的初始化函数
super(Transformer, self).__init__()
# 定义编码器
self.encoder = Encoder()
# 定义解码器
self.decoder = Decoder()
# 定义一个线性层,将d_model个神经元的编码器输出转化为目标词汇表大小个神经元的输出,不添加偏置
self.projection = nn.Linear(d_model, tgt_vocab_size, bias=False)
# 前向传播函数
def forward(self, enc_inputs, dec_inputs):
# 调用编码器
enc_outputs, enc_self_attns = self.encoder(enc_inputs)
# 调用解码器
dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs)#enc_inputs是告诉解码端那些是pad符号
# 调用线性层 调整输出形状 就是把我们的decoder输出的d_model维度映射到词汇表维度 在本例子种是512->5
dec_logits = self.projection(dec_outputs) # dec_logits : [batch_size x src_vocab_size x tgt_vocab_size]
# 返回解码 这里是方便我们调用损失函数 因为我们的形状是[batch_size x src_vocab_size x tgt_vocab_size] 所以batch * src_size是我们的总样本数 方便进行交叉熵计算
return dec_logits.view(-1, dec_logits.size(-1)), enc_self_attns, dec_self_attns, dec_enc_attns
Encoder层
词向量维度嵌入
ok 进入我们transformer的第一步解释我们的encodr层,因为transformer里面的第一步是输入到encoder里面
**这里解释一下整个原理 ,我们的encoder必须是先加上位置编码 - - > 然后进行多头注意力机制(其中有用norm操作进行残差连接) --> 前馈神经网络层 ** 简单的三步却让人一生无法释怀!
我将分开讲,首先是定义,很简单,我想说的是这个词嵌入层,可能新手不太懂(比如我),那么你可以简单的理解为维度转换,这里就是把维度为[1,5,5]的数据变成[1,5,512] 我添加了打印代码,调试结果在下面。然后就是pos_emb 这个就是相当与把我们需要训练的数据加上下标,具体就是根据我们的编码表(这里是直接把我们的编码表输入了)获得对应的嵌入。
def __init__(self):
super(Encoder, self).__init__()
# 定义编码器中的词嵌入层
self.src_emb = nn.Embedding(src_vocab_size, d_model)
# 定义编码器中的位置编码层,从预训练的编码表中获取
self.pos_emb = nn.Embedding.from_pretrained(get_sinusoid_encoding_table(src_len+1, d_model),freeze=True)
# 定义编码器中的层
self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])
掩码部分实现
OK进行下一步
这一步很重要!因为我们在训练的时候要要求我们的序列是等长的,但是很多时候我们的词汇并不是 这样,有长有短,所以我们必须用一些特定符号进行标记,这里我们用PAD来说明,但是填充的时候肯定要标记呀,不然把PAD也拿去训练那不是搞笑了吗。
# 获取编码器自注意力时的mask 注意我们的输入都是enc_inputs
enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs)
所以这个函数是这样的
我们的输入都是enc_inputs 为什么?** 这个问题就要从注意力点积开始了。注意力计算的核心操作是对查询(Q)和键(K)之间的点积。假设查询的维度是 [batch_size, len_q, d_k],键的维度是 [batch_size, len_k, d_k],点积结果的维度将会是 [batch_size, len_q, len_k]。
掩码矩阵的维度需要与这个点积结果的维度匹配,以便正确地应用掩码。因此,掩码矩阵的维度必须是 [batch_size, len_q, len_k]。**
def get_attn_pad_mask(seq_q, seq_k):#enc_inputs, enc_inputs 告诉后面的句子后面的层 那些是被pad符号填充的 pad的目的是让batch里面的每一行长度一致
# 获取seq_q和seq_k的batch_size和长度
batch_size, len_q = seq_q.size()
batch_size, len_k = seq_k.size()
# 创建一个全为0的mask矩阵,维度为batch_size x len_k
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1) # batch_size x 1 x len_k(=len_q), one is maskingtensor([[[False, False, False, False, True]]]) True代表为pad符号
# 扩展mask矩阵,使其维度为batch_size x len_q x len_k
a=pad_attn_mask.expand(batch_size, len_q, len_k)
#tensor([[[False, False, False, False, True],
# [False, False, False, False, True],
# [False, False, False, False, True],
# [False, False, False, False, True],
# [False, False, False, False, True]]])
return pad_attn_mask.expand(batch_size, len_q, len_k) # batch_size x len_q x len_k 重复len_q次
做完这些,最后就到了encoder的叠加,也就是N个encoder的叠加,把上一层的输出传给下一层
把上一个encoder的输出变为为下一个encoder的输入,还有encoder的掩码也要传递给下一个encoder,因为它们的掩码是通用的。
# 遍历编码器中的每一层
for layer in self.layers:
# 将输出传递给下一层
enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask)
# 将编码器自注意力添加到自注意力列表中 这个列表主要是最后的展示show 没什么用
enc_self_attns.append(enc_self_attn)
# 返回编码器的输出和自注意力列表
多层encoder叠加
那有人会问 ,我们的encoderlayer是怎么样的
以下代码解释了怎么样的
我们可以看到,里面很简单和原文的模型结构一样,一个自注意力层和前馈神经网络。
class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention() # 编码器自注意力层
self.pos_ffn = PoswiseFeedForwardNet() # 位置前馈神经网络
def forward(self, enc_inputs, enc_self_attn_mask):
enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_mask) # encoder的输入的qkv特征都是来自一个input所以一样
enc_outputs = self.pos_ffn(enc_outputs) # enc_outputs: [batch_size x len_q x d_model]
return enc_outputs, attn
多头自注意力层
``这个实现的代码就是通过给的input然后乘以权重矩阵Wq Wk Wv来获得 我们的qkv特征,当然我们再这里需要分头,所谓分头就是把我们的d_model特征分成n_head*d_q 或d_v 或d_k个头,然后进行注意力计算达到我们原文描述的效果。在下面注解种有。
class MultiHeadAttention(nn.Module):
def __init__(self):#通过线形层映射一个qkv的特征矩阵
super(MultiHeadAttention, self).__init__()
self.W_Q = nn.Linear(d_model, d_k * n_heads)
self.W_K = nn.Linear(d_model, d_k * n_heads)
self.W_V = nn.Linear(d_model, d_v * n_heads)
self.linear = nn.Linear(n_heads * d_v, d_model)
self.layer_norm = nn.LayerNorm(d_model)
def forward(self, Q, K, V, attn_mask):
# q: [batch_size x len_q x d_model], k: [batch_size x len_k x d_model], v: [batch_size x len_k x d_model]
residual, batch_size = Q, Q.size(0)
# (B, S, D) -proj-> (B, S, D) -split-> (B, S, H, W) -trans-> (B, H, S, W)这里是分头 也就是把q分成q_1 q_2.。。。
q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2) # q_s: [batch_size x n_heads x len_q x d_k]
k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2) # k_s: [batch_size x n_heads x len_k x d_k]
v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2) # v_s: [batch_size x n_heads x len_k x d_v]
print("W_Q", self.W_Q.weight)
print("W_K", self.W_K.weight)
print("W_V", self.W_V.weight)
当然对于分的每个头,我们都需要知道他们的掩码(也就是pad符号)所以我们对我们的掩码也进行分头(效果如下图)
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1) # attn_mask : [batch_size x n_heads x len_q x len_k] 对每个头我们都要知道他们的qkv特征的pad填充在哪所以也要分头
然后是注意力点积计算部分
context, attn = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)
函数ScaledDotProductAttention根据原文给的公式计算,我的上一篇博客有
这里注意一个问题,这里可能会问为什么不能先和v乘再进行softmax 因为qk乘以得到的结果矩阵和v维度不一样 再者我们假如乘完再softmax最后会失去其注意力机制的意义 首先因为乘以完v后我们会维度不匹配,scores 的形状是 [batch_size, seq_len, seq_len],V 的形状是 [batch_size, seq_len, d_v]。直接相乘在数学上是行不通的,因为矩阵乘法要求维度匹配。再者原文中只有这qk的公式。
class ScaledDotProductAttention(nn.Module):#这里是注意力机制的计算部分
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_mask):
scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size x n_heads x len_q(=len_k) x len_k(=len_q)] 做qk乘法后除以维度
scores.masked_fill_(attn_mask, -1e9) # Fills elements of self tensor with value where mask is one. 把pad符号的值设置为负无穷 因为设置负无穷可以让softmax的值都为0
attn = nn.Softmax(dim=-1)(scores) # 归一化 这里可能会问为什么不能先和v乘再进行softmax 因为qk乘以得到的结果矩阵和v维度不一样 再者我们假如乘完再softmax最后会失去其注意力机制的意义 因为我们要
context = torch.matmul(attn, V)#[1,8,5,5]x[1,8,5,64]=[1,8,5,64] 计算矩阵乘法
return context, attn
然后是把得到的context进行合并用于前馈神经网络降维后再进行残差链接
context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v) # context: [batch_size x len_q x n_heads * d_v]这里d_v是64
output = self.linear(context)#线性层把512->512便于残差链接
return self.layer_norm(output + residual), attn # output: [batch_size x len_q x d_model]残差链接
前馈神经网络、
self.pos_ffn = PoswiseFeedForwardNet() # 位置前馈神经网络
这里主要是对我们的通道进行缩放注解中有
class PoswiseFeedForwardNet(nn.Module):#对应图的feedforward层 前馈神经网络增强非线性表达能力 具体就是将其低维度卷积->高纬度 用ReLU激活函数->还原低纬度
def __init__(self):
super(PoswiseFeedForwardNet, self).__init__()
self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)#512->2048
self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)#2048->512
self.layer_norm = nn.LayerNorm(d_model)#顺手进行norm操作
#一维卷积 (nn.Conv1d) 需要输入的数据形状为 [batch_size, in_channels, length]。具体来说:
# batch_size:批次大小,表示输入样本的数量。
# in_channels:输入通道数,表示输入特征的维度。
# length:输入序列的长度。
def forward(self, inputs):
residual = inputs # inputs : [batch_size, len_q, d_model]
output = nn.ReLU()(self.conv1(inputs.transpose(1, 2)))#这里因为一维卷积需要维度再前所以要交换特征位置
output = self.conv2(output).transpose(1, 2)
return self.layer_norm(output + residual)
Decodr部分
说到Dcoder 其实Dcoder和Encoder的代码差不多 也就是多了交叉注意力机制。但是Dcoder的输入我们需要注意,他们分别是encoder的输出和输入还有我们额decoder的输入。这个时候疑惑,这里的代码为什么还需要encoder的输入?其实这里的输入就是为了再交叉注意力计算的时候给出encoder 的掩码位置便于计算而已。
# 调用解码器
dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs)#enc_inputs是告诉解码端那些是pad符号
初始化代码就是词嵌入和位置编码和encoder一样
class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.tgt_emb = nn.Embedding(tgt_vocab_size, d_model)
self.pos_emb = nn.Embedding.from_pretrained(get_sinusoid_encoding_table(tgt_len+1, d_model),freeze=True)
self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)])
mask操作
因为在我们训练的时候需要蒙住操作,具体原理如下图
比如输入S的时候只能看到S看不到卷起来 输入卷的时候只能看到S和卷看不到起来
所以形成一个上三角矩阵。
代码如下 核心代码就是最后一句获得我们的mask矩阵
def forward(self, dec_inputs, enc_inputs, enc_outputs): # dec_inputs : [batch_size x target_len]
dec_outputs = self.tgt_emb(dec_inputs) + self.pos_emb(torch.LongTensor([[5,1,2,3,4]]))#和encoder的输入一样,需要加上位置编码
dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs)#解码端的自注意力mask 也就是告诉输入的pad符号不参与自注意力计算 并且告诉哪里是pad符号
dec_self_attn_subsequent_mask = get_attn_subsequent_mask(dec_inputs)#得到decoder输入的上三角mask矩阵
当和pad矩阵两者相加就会得到我们要的矩阵大于0的部分置为1小于等于0 的部分置为0
代码如下
dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequent_mask), 0)#当和pad矩阵两者相加就会得到我们要的矩阵大于0的部分置为1 小于等于0 的部分置为0
交叉注意力层实现
我们知道交叉注意力 层我们需要encoder的qk和我们decoder的v来进行qkv多头注意力矩阵计算矩阵计算
交叉注意力层的mask矩阵 它的输入Quer来自于Masked Multi-Head Attention的输出,Keys和Values来自于Encoder中最后一层的输出。所以dec_input输入的是q enc_inputs输入的是k v
dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs)#交叉注意力层的mask矩阵 它的输入Quer来自于Masked Multi-Head Attention的输出,Keys和Values来自于Encoder中最后一层的输出。所以dec_input输入的是q enc_inputs输入的是k v
然后我们就是decoder的层进行叠加6次,其实只要注意交叉注意力层就行了,但是我们要注意我们的输入要输入encoder的输出enc_outputs和decoder的输出dec_outputs,还有交叉注意力的掩码dec_enc_attn_mask和,自注意力掩码dec_self_attn_mask
dec_self_attns, dec_enc_attns = [], []
for layer in self.layers:
dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask)
dec_self_attns.append(dec_self_attn)
dec_enc_attns.append(dec_enc_attn)
return dec_outputs, dec_self_attns, dec_enc_attns
class DecoderLayer(nn.Module):
def __init__(self):
super(DecoderLayer, self).__init__()
self.dec_self_attn = MultiHeadAttention()
self.dec_enc_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()
def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs, dec_self_attn_mask)#自注意力层
dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs, dec_enc_attn_mask)#交互注意力层
dec_outputs = self.pos_ffn(dec_outputs)#前馈神经网络
return dec_outputs, dec_self_attn, dec_enc_attn
前馈神经网络
其实encoder和decoder的前馈神经网络是一样的,都是维度变换压缩然后512->2048 ->512然后进行残差链接。具体可以看我的代码注释
最后一部
就是计算损失…,用测试集测试,将返回的掩码进行展示
为什么返回掩码集合就是以下这样
print('first head of last state enc_self_attns')
showgraph(enc_self_attns)
print('first head of last state dec_self_attns')
showgraph(dec_self_attns)
print('first head of last state dec_enc_attns')
showgraph(dec_enc_attns)
全部代码
# %%
# code by Tae Hwan Jung(Jeff Jung) @graykode, Derek Miller @dmmiller612
# Reference : https://github.com/jadore801120/attention-is-all-you-need-pytorch
# https://github.com/JayParks/transformer
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank sequence if current batch data size is short than time steps
# 定义一个函数,用于将句子转换为批量数据
def make_batch(sentences):
# 将输入句子转换为词汇表的索引
input_batch = [[src_vocab[n] for n in sentences[0].split()]]
# 将输出句子转换为词汇表的索引
output_batch = [[tgt_vocab[n] for n in sentences[1].split()]]
# 将目标句子转换为词汇表的索引
target_batch = [[tgt_vocab[n] for n in sentences[2].split()]]
# 返回批量数据
return torch.LongTensor(input_batch), torch.LongTensor(output_batch), torch.LongTensor(target_batch)
# 定义一个函数,用于获取正弦编码表
def get_sinusoid_encoding_table(n_position, d_model):
# 定义一个计算角度的函数
def cal_angle(position, hid_idx):
# 计算位置除以10000的幂次方,再乘以2乘以隐藏层索引除以d_model
return position / np.power(10000, 2 * (hid_idx // 2) / d_model)
# 定义一个获取位置角度向量的函数
def get_posi_angle_vec(position):
# 返回一个列表,列表的元素为计算得到的的角度
return [cal_angle(position, hid_j) for hid_j in range(d_model)]
# 创建一个n_position位置的正弦编码表
sinusoid_table = np.array([get_posi_angle_vec(pos_i) for pos_i in range(n_position)])
# 将正弦编码表的偶数列(dim 2i)设置为sin
sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2]) # dim 2i
# 将正弦编码表的奇数列(dim 2i+1)设置为cos
sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2]) # dim 2i+1
# 返回FloatTensor类型的正弦编码表
return torch.FloatTensor(sinusoid_table)
def get_attn_pad_mask(seq_q, seq_k):#enc_inputs, enc_inputs 告诉后面的句子后面的层 那些是被pad符号填充的 pad的目的是让batch里面的每一行长度一致
# 获取seq_q和seq_k的batch_size和长度
batch_size, len_q = seq_q.size()
batch_size, len_k = seq_k.size()
# 创建一个全为0的mask矩阵,维度为batch_size x len_k 增加第一个维度,len_k个元素都是0
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1) # batch_size x 1 x len_k(=len_q), one is maskingtensor([[[False, False, False, False, True]]]) True代表为pad符号
# 扩展mask矩阵,使其维度为batch_size x len_q x len_k
a=pad_attn_mask.expand(batch_size, len_q, len_k)#tensor([[[False, False, False, False, True],
# [False, False, False, False, True],
# [False, False, False, False, True],
# [False, False, False, False, True],
# [False, False, False, False, True]]])
return pad_attn_mask.expand(batch_size, len_q, len_k) # batch_size x len_q x len_k 重复len_q次
def get_attn_subsequent_mask(seq):
attn_shape = [seq.size(0), seq.size(1), seq.size(1)]#创建三角矩阵维度
subsequent_mask = np.triu(np.ones(attn_shape), k=1)#创建一个上三角为1的矩阵,为什么要这样 因为我们的mask是 比如[S我爱我家] 从S开始看就是说看S的时候只能看S 看到我的时候只能看过S和我。。。就是单词掩码
subsequent_mask = torch.from_numpy(subsequent_mask).byte()
return subsequent_mask
class ScaledDotProductAttention(nn.Module):#这里是注意力机制的计算部分
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_mask):
scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size x n_heads x len_q(=len_k) x len_k(=len_q)] 做qk乘法后除以维度
scores.masked_fill_(attn_mask, -1e9) # Fills elements of self tensor with value where mask is one. 把pad符号的值设置为负无穷 因为设置负无穷可以让softmax的值都为0
attn = nn.Softmax(dim=-1)(scores) # 归一化 这里可能会问为什么不能先和v乘再进行softmax 因为qk乘以得到的结果矩阵和v维度不一样 再者我们假如乘完再softmax最后会失去其注意力机制的意义 因为我们要
context = torch.matmul(attn, V)#[1,8,5,5]x[1,8,5,64]=[1,8,5,64] 计算点积
return context, attn
class MultiHeadAttention(nn.Module):
def __init__(self):#通过线形层映射一个qkv的特征矩阵
super(MultiHeadAttention, self).__init__()
self.W_Q = nn.Linear(d_model, d_k * n_heads)
self.W_K = nn.Linear(d_model, d_k * n_heads)
self.W_V = nn.Linear(d_model, d_v * n_heads)
self.linear = nn.Linear(n_heads * d_v, d_model)
self.layer_norm = nn.LayerNorm(d_model)
def forward(self, Q, K, V, attn_mask):
# q: [batch_size x len_q x d_model], k: [batch_size x len_k x d_model], v: [batch_size x len_k x d_model]
residual, batch_size = Q, Q.size(0)
# (B, S, D) -proj-> (B, S, D) -split-> (B, S, H, W) -trans-> (B, H, S, W)这里是分头 也就是把q分成q_1 q_2.。。。
q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2) # q_s: [batch_size x n_heads x len_q x d_k]
k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2) # k_s: [batch_size x n_heads x len_k x d_k]
v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2) # v_s: [batch_size x n_heads x len_k x d_v]
print("W_Q", self.W_Q.weight)
print("W_K", self.W_K.weight)
print("W_V", self.W_V.weight)
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1) # attn_mask : [batch_size x n_heads x len_q x len_k] 对每个头我们都要知道他们的qkv特征的pad填充在哪所以也要分头
# context: [batch_size x n_heads x len_q x d_v], attn: [batch_size x n_heads x len_q(=len_k) x len_k(=len_q)]
context, attn = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask)
context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v) # context: [batch_size x len_q x n_heads * d_v]这里d_v是64
output = self.linear(context)#线性层把512->512便于残差链接
return self.layer_norm(output + residual), attn # output: [batch_size x len_q x d_model]残差链接
class PoswiseFeedForwardNet(nn.Module):#对应图的feedforward层 前馈神经网络增强非线性表达能力 具体就是将其低维度卷积->高纬度 用ReLU激活函数->还原低纬度
def __init__(self):
super(PoswiseFeedForwardNet, self).__init__()
self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=d_ff, kernel_size=1)#512->2048
self.conv2 = nn.Conv1d(in_channels=d_ff, out_channels=d_model, kernel_size=1)#2048->512
self.layer_norm = nn.LayerNorm(d_model)#顺手进行norm操作
#一维卷积 (nn.Conv1d) 需要输入的数据形状为 [batch_size, in_channels, length]。具体来说:
# batch_size:批次大小,表示输入样本的数量。
# in_channels:输入通道数,表示输入特征的维度。
# length:输入序列的长度。
def forward(self, inputs):
residual = inputs # inputs : [batch_size, len_q, d_model]
output = nn.ReLU()(self.conv1(inputs.transpose(1, 2)))#这里因为一维卷积需要维度再前所以要交换特征位置
output = self.conv2(output).transpose(1, 2)
return self.layer_norm(output + residual)
class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention() # 编码器自注意力层
self.pos_ffn = PoswiseFeedForwardNet() # 位置前馈神经网络
def forward(self, enc_inputs, enc_self_attn_mask):
enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_mask) # enc_inputs to same Q,K,V
enc_outputs = self.pos_ffn(enc_outputs) # enc_outputs: [batch_size x len_q x d_model]
return enc_outputs, attn
class DecoderLayer(nn.Module):
def __init__(self):
super(DecoderLayer, self).__init__()
self.dec_self_attn = MultiHeadAttention()
self.dec_enc_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()
def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs, dec_self_attn_mask)#自注意力层
dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs, dec_enc_attn_mask)#交互注意力层
dec_outputs = self.pos_ffn(dec_outputs)#前馈神经网络
return dec_outputs, dec_self_attn, dec_enc_attn
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
# 定义编码器中的词嵌入层
self.src_emb = nn.Embedding(src_vocab_size, d_model)
# 定义编码器中的位置编码层,从预训练的sinusoid编码表中获取
self.pos_emb = nn.Embedding.from_pretrained(get_sinusoid_encoding_table(src_len+1, d_model),freeze=True)
# 定义编码器中的层
self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])
def forward(self, enc_inputs): # enc_inputs : [batch_size x source_len]
# 将输入的词转换为词嵌入
a=self.src_emb(enc_inputs)
print(a)
print(a.shape)
enc_outputs = self.src_emb(enc_inputs) + self.pos_emb(torch.LongTensor([[1,2,3,4,0]]))
print("enc_outputs:",enc_outputs)
# 获取编码器自注意力时的mask
enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs)
enc_self_attns = []
# 初始化编码器自注意力列表
print("enc_self_attn_mask:",enc_self_attn_mask.shape)
# 遍历编码器中的每一层
for layer in self.layers:
# 将输出传递给下一层
enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask)
# 将编码器自注意力添加到自注意力列表中
enc_self_attns.append(enc_self_attn)
# 返回编码器的输出和自注意力列表
return enc_outputs, enc_self_attns
class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.tgt_emb = nn.Embedding(tgt_vocab_size, d_model)
self.pos_emb = nn.Embedding.from_pretrained(get_sinusoid_encoding_table(tgt_len+1, d_model),freeze=True)
self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)])
def forward(self, dec_inputs, enc_inputs, enc_outputs): # dec_inputs : [batch_size x target_len]
dec_outputs = self.tgt_emb(dec_inputs) + self.pos_emb(torch.LongTensor([[5,1,2,3,4]]))#和encoder的输入一样,需要加上位置编码
dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs)#解码端的自注意力mask 也就是告诉输入的pad符号不参与自注意力计算 并且告诉哪里是pad符号
dec_self_attn_subsequent_mask = get_attn_subsequent_mask(dec_inputs)#得到decoder输入的上三角mask矩阵
dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequent_mask), 0)#当和pad矩阵两者相加就会得到我们要的矩阵大于0的部分置为1 小于等于0 的部分置为0
dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs)#交叉注意力层的mask矩阵 它的输入Quer来自于Masked Multi-Head Attention的输出,Keys和Values来自于Encoder中最后一层的输出。所以dec_input输入的是q enc_inputs输入的是k v
dec_self_attns, dec_enc_attns = [], []
for layer in self.layers:
dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask)
dec_self_attns.append(dec_self_attn)
dec_enc_attns.append(dec_enc_attn)
return dec_outputs, dec_self_attns, dec_enc_attns
# 定义Transformer类,继承自nn.Module
class Transformer(nn.Module):
# 初始化函数
def __init__(self):
# 调用父类的初始化函数
super(Transformer, self).__init__()
# 定义编码器
self.encoder = Encoder()
# 定义解码器
self.decoder = Decoder()
# 定义一个线性层,将d_model个神经元的编码器输出转化为目标词汇表大小个神经元的输出,不添加偏置
self.projection = nn.Linear(d_model, tgt_vocab_size, bias=False)
# 前向传播函数
def forward(self, enc_inputs, dec_inputs):
# 调用编码器
enc_outputs, enc_self_attns = self.encoder(enc_inputs)
# 调用解码器
dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs)#enc_inputs是告诉解码端那些是pad符号
# 调用线性层
dec_logits = self.projection(dec_outputs) # dec_logits : [batch_size x src_vocab_size x tgt_vocab_size]
# 返回解码 logits
return dec_logits.view(-1, dec_logits.size(-1)), enc_self_attns, dec_self_attns, dec_enc_attns
def showgraph(attn):
attn = attn[-1].squeeze(0)[0]
attn = attn.squeeze(0).data.numpy()
fig = plt.figure(figsize=(n_heads, n_heads)) # [n_heads, n_heads]
ax = fig.add_subplot(1, 1, 1)
ax.matshow(attn, cmap='viridis')
ax.set_xticklabels(['']+sentences[0].split(), fontdict={'fontsize': 14}, rotation=90)
ax.set_yticklabels(['']+sentences[2].split(), fontdict={'fontsize': 14})
plt.show()
if __name__ == '__main__':
sentences = ['ich mochte ein bier P', 'S i want a beer', 'i want a beer E']
# Transformer Parameters
# Padding Should be Zero
src_vocab = {'P': 0, 'ich': 1, 'mochte': 2, 'ein': 3, 'bier': 4}
src_vocab_size = len(src_vocab)
tgt_vocab = {'P': 0, 'i': 1, 'want': 2, 'a': 3, 'beer': 4, 'S': 5, 'E': 6}
number_dict = {i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)
src_len = 5 # length of source
tgt_len = 5 # length of target
d_model = 512 # Embedding Size 词嵌入的维度 可以理解为一个词有512个维度 它被分解成了具有一个512维度特征的向量 更浅显易懂的说就是把“我”这个词和512种不同的词作比较从而得到向量
d_ff = 2048 # 这个先不管 是前馈神经网络所需要提高的维度
d_k = d_v = 64 # 特征k v 的特征维度
n_layers = 6 # number of Encoder of Decoder Layer
n_heads = 8 # 把qkv所需要分的头数 但是要注意 n_heads * d_k = d_model 为什么?因为输入的d_model被划分成8个子空间(也就是头数)这意味着每个头输出的维度是64计算完成后拼接起来
model = Transformer()# Transformer模型
criterion = nn.CrossEntropyLoss()#交叉熵损失函数
optimizer = optim.Adam(model.parameters(), lr=0.001)# 优化器
enc_inputs, dec_inputs, target_batch = make_batch(sentences)
print("sentences"+str(sentences))
print("make_batch:"+str(enc_inputs.shape))
print("make_batch:"+str(enc_inputs))
print("make_batch:"+str(dec_inputs.shape))
print("make_batch:"+str(dec_inputs))
for epoch in range(20):
optimizer.zero_grad()
outputs, enc_self_attns, dec_self_attns, dec_enc_attns = model(enc_inputs, dec_inputs)
loss = criterion(outputs, target_batch.contiguous().view(-1))
print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
loss.backward()
optimizer.step()
# Test
predict, _, _, _ = model(enc_inputs, dec_inputs)
predict = predict.data.max(1, keepdim=True)[1]
print(sentences[0], '->', [number_dict[n.item()] for n in predict.squeeze()])
print('first head of last state enc_self_attns')
showgraph(enc_self_attns)
print('first head of last state dec_self_attns')
showgraph(dec_self_attns)
print('first head of last state dec_enc_attns')
showgraph(dec_enc_attns)