一、从零手实现 GPT Transformer 模型架构
近年来,大模型的发展势头迅猛,成为了人工智能领域的研究热点。大模型以其强大的语言理解和生成能力,在自然语言处理、机器翻译、文本生成等多个领域取得了显著的成果。但这些都离不开其背后的核心架构——Transformer
。
Transformer
是一种基于自注意力机制的深度神经网络模型,其核心思想是通过自注意力机制来捕捉序列中的长距离依赖关系。自注意力机制允许模型在处理每个词时,同时考虑序列中的所有其他词,并根据它们之间的关联程度进行加权。这种方法打破了传统循环神经网络(RNN
)和长短期记忆网络(LSTM
)在处理长序列时的局限性,使得Transformer
在处理大规模数据时更加高效。
本文仅使用 PyTorch
,从零构建网络结构、构建词表、训练一个 GPT
对话模型。带你体验如何从0
到1
实现一个自定义的对话模型。模型整体以 Transformer Only Decoder
作为核心架构,由多个相同的层堆叠而成,每个层包括自注意力机制、位置编码和前馈神经网络。最终实现效果如下所示:
二、模型搭建
2.1 点积注意力层搭建
注意力的计算公式如下:
首先输入会通过三个不同的线性变换得到三个矩阵,分别是查询(Q
)、键(K
)和值(V
)。
然后,计算 Q
与所有键 K
的点积,得到注意力得分,其中d_k
是键向量K
的维度。还需要再除以根号下d_k
,目的是为了在梯度下降时保持数值稳定性。
然后,将得到的注意力得分通过Softmax
函数进行归一化,使得所有得分加起来等于 1
。这样,每个得分就变成了一个概率值,表示在当前元素中,其他元素所占的权重。
最后将 Softmax
得到的概率值与值(V
)相乘,得到自注意力层的输出。
这里需要注意的是注意力掩码,由于输入序列可能有不同的长度,但矩阵计算时需要固定的大小,因此针对长度不足的序列,可以使用 padding
作为填充标记,但这些 padding
的信息是没有意义的,计算注意力分数也没有意义,因此可以将 padding
位置的分数置为非常小,后续计算 softmax
之后基本就是 0
了。
实现过程如下:
class ScaledDotProductAttention(nn.Module):
def __init__(self, d_k):
super(ScaledDotProductAttention, self).__init__()
self.d_k = d_k
def forward(self, q, k, v, attention_mask):
##
# q: [batch_size, n_heads, len_q, d_k]
# k: [batch_size, n_heads, len_k, d_k]
# v: [batch_size, n_heads, len_v, d_v]
# attn_mask: [batch_size, n_heads, seq_len, seq_len]
##
# 计算每个Q与K的分数,计算出来的大小是 [batch_size, n_heads, len_q, len_q]
scores = torch.matmul(q, k.transpose(-1, -2)) / np.sqrt(self.d_k)
# 把被mask的地方置为无限小,softmax之后基本就是0,也就对q不起作用
scores.masked_fill_(attention_mask, -1e9)
attn = nn.Softmax(dim=-1)(scores)
# 注意力后的大小 [batch_size, n_heads, len_q, d_v]
context = torch.matmul(attn, v)
return context, attn
2.2 多头注意力层搭建
多头注意力层在单头注意力层的基础上,主要将Q、K、V
拆分成多个头,然后并行的处理,每个头可以学习序列的不同特征,增强模型的特征提取能力。
多头注意力层的输出是多个头输出的拼接,通过一个线性层转换成和输入相同的序列,然后再和原始值相加构成残差,最后由 LN
归一化后输出。
实现过程如下:
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, n_heads, d_k, d_v):
super(MultiHeadAttention, self).__init__()
self.d_model = d_model
self.n_heads = n_heads
self.d_k = d_k
self.d_v = d_v
self.w_q = nn.Linear(d_model, d_k * n_heads, bias=False)
self.w_k = nn.Linear(d_model, d_k * n_heads, bias=False)
self.w_v = nn.Linear(d_model, d_v * n_heads, bias=False)
self.fc = nn.Linear(n_heads * d_v, d_model, bias=False)
self.layernorm = nn.LayerNorm(d_model)
def forward(self, q, k, v, attention_mask):
##
# q: [batch_size, seq_len, d_model]
# k: [batch_size, seq_len, d_model]
# v: [batch_size, seq_len, d_model]
# attn_mask: [batch_size, seq_len, seq_len]
##
# 记录原始值, 后续计算残差
residual, batch_size = q, q.size(0)
# 先映射 q、k、v, 然后后分头
# q: [batch_size, n_heads, len_q, d_k]
q = self.w_q(q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
# k: [batch_size, n_heads, len_k, d_k]
k = self.w_k(k).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
# v: [batch_size, n_heads, len_v(=len_k), d_v]
v = self.w_v(v).view(batch_size, -1, self.n_heads, self.d_v).transpose(1, 2)
# attn_mask : [batch_size, n_heads, seq_len, seq_len]
attention_mask = attention_mask.unsqueeze(1).repeat(1, self.n_heads, 1, 1)
# 点积注意力分数计算, [batch_size, n_heads, len_q, d_v]
context, attn = ScaledDotProductAttention(self.d_k)(q, k, v, attention_mask)
# context: [batch_size, len_q, n_heads * d_v]
context = context.transpose(1, 2).reshape(batch_size, -1, self.n_heads * self.d_v)
# 还原为原始大小
output = self.fc(context)
# LN + 残差计算
return self.layernorm(output + residual), attn
2.3 前馈神经网络层搭建
前馈神经网络层,组成比较简单,由两个线性全连接层组成,中间使用 ReLU
激活函数衔接,主要在做一个升维再降维的操作,可以学习到更为抽象的特征。
实现过程如下:
class PoswiseFeedForwardNet(nn.Module):
def __init__(self, d_model, d_ff):
super(PoswiseFeedForwardNet, self).__init__()
self.fc = nn.Sequential(
nn.Linear(d_model, d_ff, bias=False),
nn.ReLU(),
nn.Linear(d_ff, d_model, bias=False)
)
self.layernorm = nn.LayerNorm(d_model)
def forward(self, inputs):
##
# inputs: [batch_size, seq_len, d_model]
##
residual = inputs
output = self.fc(inputs)
# # LN + 残差计算, [batch_size, seq_len, d_model]
return self.layernorm(output + residual)
2.4 解码层构建
上面有了多头注意力机制和前馈神经网络层后,这里就可以构建解码层了,一个解码层由一个多头注意力层和一个前馈神经网络层组成。
实现过程如下:
class DecoderLayer(nn.Module):
def __init__(self, d_model, n_heads, d_ff, d_k, d_v):
super(DecoderLayer, self).__init__()
# 多头注意力层
self.attention = MultiHeadAttention(d_model, n_heads, d_k, d_v)
# 前馈神经网络层
self.pos_ffn = PoswiseFeedForwardNet(d_model, d_ff)
def forward(self, inputs, attention_mask):
##
# inputs: [batch_size, seq_len, d_model]
# attention_mask: [batch_size, seq_len, seq_len]
##
# outputs: [batch_size, seq_len, d_model]
# self_attn: [batch_size, n_heads, seq_len, seq_len]
outputs, self_attn = self.attention(inputs, inputs, inputs, attention_mask)
# [batch_size, seq_len, d_model]
outputs = self.pos_ffn(outputs)
return outputs, self_attn
2.5 解码器构建
解码器主要将多个解码层堆叠,形成一个特征提取链路。首先解码器接收输入的 Token
,然后通过 Embedding
转为高维向量表示,由于注意力机制没有位置信息,因此这里还需要加上位置编码。
位置编码这里参照 GPT2
的做法,直接对位置再次进行 Embedding
。这里你也可以换成固定位置编码、旋转位置编码进行实验。
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_pos, device):
super(PositionalEncoding, self).__init__()
self.device = device
self.pos_embedding = nn.Embedding(max_pos, d_model)
def forward(self, inputs):
seq_len = inputs.size(1)
pos = torch.arange(seq_len, dtype=torch.long, device=self.device)
# [seq_len] -> [batch_size, seq_len]
pos = pos.unsqueeze(0).expand_as(inputs)
return self.pos_embedding(pos)
对于 Transformer Decoder
结构,模型在解码时应该是自回归的,每次都是基于之前的信息预测下一个Token
,这意味着在生成序列的第 i
个元素时,模型只能看到位置 i
之前的信息。因此在训练时需要进行遮盖,防止模型看到未来的信息,遮盖的操作也非常简单,可以构建一个上三角掩码器。
例如:
原始注意力分数矩阵(无掩码):
[[q1k1, q1k2, q1k3, q1k4],
[q2k1, q2k2, q3k3, q3k4],
[q3k1, q3k2, q3k3, q3k4],
[q4k1, q4k2, q4k3, q4k4]]
上三角掩码器:
[[0, 1, 1, 1],
[0, 0, 1, 1],
[0, 0, 0, 1],
[0, 0, 0, 0]]
应用掩码后的分数矩阵:
[[q1k1, -inf, -inf, -inf],
[q2k1, q2k2, -inf, -inf],
[q3k1, q3k2, q3k3, -inf],
[q4k1, q4k2, q4k3, q4k4]]
实现过程:
def get_attn_subsequence_mask(seq, device):
# 注意力分数的大小是 [batch_size, n_heads, len_seq, len_seq]
# 所以这里要生成 [batch_size, len_seq, len_seq] 大小
attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
# 生成一个上三角矩阵
subsequence_mask = np.triu(np.ones(attn_shape), k=1)
subsequence_mask = torch.from_numpy(subsequence_mask).byte()
subsequence_mask = subsequence_mask.to(device)
return subsequence_mask
attention_mask
的掩码大小调整,要转换成 [batch_size, len_seq, len_seq]
大小,方便和注意力分数计算:
def get_attn_pad_mask(attention_mask):
batch_size, len_seq = attention_mask.size()
attention_mask = attention_mask.data.eq(0).unsqueeze(1)
# 注意力分数的大小是 [batch_size, n_heads, len_q, len_q]
# 所以这里要转换成 [batch_size, len_seq, len_seq] 大小
return attention_mask.expand(batch_size, len_seq, len_seq)
到这就可以构建解码器了,实现过程:
class Decoder(nn.Module):
def __init__(self, d_model, n_heads, d_ff, d_k, d_v, vocab_size, max_pos, n_layers, device):
super(Decoder, self).__init__()
self.device = device
# 将Token转为向量
self.embedding = nn.Embedding(vocab_size, d_model)
# 位置编码
self.pos_encoding = PositionalEncoding(d_model, max_pos, device)
self.layers = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, d_k, d_v) for _ in range(n_layers)])
def forward(self, inputs, attention_mask):
##
# inputs: [batch_size, seq_len]
##
# [batch_size, seq_len, d_model]
outputs = self.embedding(inputs) + self.pos_encoding(inputs)
# 上三角掩码,防止看到未来的信息, [batch_size, seq_len, seq_len]
subsequence_mask = get_attn_subsequence_mask(inputs, self.device)
if attention_mask is not None:
# pad掩码 [batch_size, seq_len, seq_len]
attention_mask = get_attn_pad_mask(attention_mask)
# [batch_size, seq_len, seq_len]
attention_mask = torch.gt((attention_mask + subsequence_mask), 0)
else:
attention_mask = subsequence_mask.bool()
# 计算每一层的结果
self_attns = []
for layer in self.layers:
# outputs: [batch_size, seq_len, d_model],
# self_attn: [batch_size, n_heads, seq_len, seq_len],
outputs, self_attn = layer(outputs, attention_mask)
self_attns.append(self_attn)
return outputs, self_attns
2.6 构建GPT模型
上面构建好解码器之后,就可以得到处理后的特征,下面还需要将特征转为词表大小的概率分布,才能实现对下一个Token
的预测。
实现过程:
class GPTModel(nn.Module):
def __init__(self, d_model, n_heads, d_ff, d_k, d_v, vocab_size, max_pos, n_layers, device):
super(GPTModel, self).__init__()
# 解码器
self.decoder = Decoder(d_model, n_heads, d_ff, d_k, d_v, vocab_size, max_pos, n_layers, device)
# 映射为词表大小
self.projection = nn.Linear(d_model, vocab_size)
def forward(self, inputs, attention_mask=None):
##
# inputs: [batch_size, seq_len]
##
# outputs: [batch_size, seq_len, d_model]
# self_attns: [n_layers, batch_size, n_heads, seq_len, seq_len]
outputs, self_attns = self.decoder(inputs, attention_mask)
# [batch_size, seq_len, vocab_size]
logits = self.projection(outputs)
return logits.view(-1, logits.size(-1)), self_attns
到此整个的 GPT
模型也就搭建好了,可以打印看下网络结构,以及模型参数量:
import torch
from model import GPTModel
def main():
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# 模型参数
model_param = {
"d_model": 768, # 嵌入层大小
"d_ff": 2048, # 前馈神经网络大小
"d_k": 64, # K 的大小
"d_v": 64, # V 的大小
"n_layers": 6, # 解码层的数量
"n_heads": 8, # 多头注意力的头数
"max_pos": 1800, # 位置编码的长度
"device": device, # 设备
"vocab_size": 4825 # 词表大小
}
model = GPTModel(**model_param)
total_params = sum(p.numel() for p in model.parameters())
print(model)
print("total_params: ", total_params)
if __name__ == '__main__':
main()
网络结构:
GPTModel(
(decoder): Decoder(
(embedding): Embedding(4825, 768)
(pos_encoding): PositionalEncoding(
(pos_embedding): Embedding(1800, 768)
)
(layers): ModuleList(
(0): DecoderLayer(
(attention): MultiHeadAttention(
(w_q): Linear(in_features=768, out_features=512, bias=False)
(w_k): Linear(in_features=768, out_features=512, bias=False)
(w_v): Linear(in_features=768, out_features=512, bias=False)
(fc): Linear(in_features=512, out_features=768, bias=False)
(layernorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
)
(pos_ffn): PoswiseFeedForwardNet(
(fc): Sequential(
(0): Linear(in_features=768, out_features=2048, bias=False)
(1): ReLU()
(2): Linear(in_features=2048, out_features=768, bias=False)
)
(layernorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
)
)
(1): DecoderLayer(
(attention): MultiHeadAttention(
(w_q): Linear(in_features=768, out_features=512, bias=False)
(w_k): Linear(in_features=768, out_features=512, bias=False)
(w_v): Linear(in_features=768, out_features=512, bias=False)
(fc): Linear(in_features=512, out_features=768, bias=False)
(layernorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
)
(pos_ffn): PoswiseFeedForwardNet(
(fc): Sequential(
(0): Linear(in_features=768, out_features=2048, bias=False)
(1): ReLU()
(2): Linear(in_features=2048, out_features=768, bias=False)
)
(layernorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
)
)
(2): DecoderLayer(
(attention): MultiHeadAttention(
(w_q): Linear(in_features=768, out_features=512, bias=False)
(w_k): Linear(in_features=768, out_features=512, bias=False)
(w_v): Linear(in_features=768, out_features=512, bias=False)
(fc): Linear(in_features=512, out_features=768, bias=False)
(layernorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
)
(pos_ffn): PoswiseFeedForwardNet(
(fc): Sequential(
(0): Linear(in_features=768, out_features=2048, bias=False)
(1): ReLU()
(2): Linear(in_features=2048, out_features=768, bias=False)
)
(layernorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
)
)
(3): DecoderLayer(
(attention): MultiHeadAttention(
(w_q): Linear(in_features=768, out_features=512, bias=False)
(w_k): Linear(in_features=768, out_features=512, bias=False)
(w_v): Linear(in_features=768, out_features=512, bias=False)
(fc): Linear(in_features=512, out_features=768, bias=False)
(layernorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
)
(pos_ffn): PoswiseFeedForwardNet(
(fc): Sequential(
(0): Linear(in_features=768, out_features=2048, bias=False)
(1): ReLU()
(2): Linear(in_features=2048, out_features=768, bias=False)
)
(layernorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
)
)
(4): DecoderLayer(
(attention): MultiHeadAttention(
(w_q): Linear(in_features=768, out_features=512, bias=False)
(w_k): Linear(in_features=768, out_features=512, bias=False)
(w_v): Linear(in_features=768, out_features=512, bias=False)
(fc): Linear(in_features=512, out_features=768, bias=False)
(layernorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
)
(pos_ffn): PoswiseFeedForwardNet(
(fc): Sequential(
(0): Linear(in_features=768, out_features=2048, bias=False)
(1): ReLU()
(2): Linear(in_features=2048, out_features=768, bias=False)
)
(layernorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
)
)
(5): DecoderLayer(
(attention): MultiHeadAttention(
(w_q): Linear(in_features=768, out_features=512, bias=False)
(w_k): Linear(in_features=768, out_features=512, bias=False)
(w_v): Linear(in_features=768, out_features=512, bias=False)
(fc): Linear(in_features=512, out_features=768, bias=False)
(layernorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
)
(pos_ffn): PoswiseFeedForwardNet(
(fc): Sequential(
(0): Linear(in_features=768, out_features=2048, bias=False)
(1): ReLU()
(2): Linear(in_features=2048, out_features=768, bias=False)
)
(layernorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
)
)
)
)
(projection): Linear(in_features=768, out_features=4825, bias=True)
total_params: 37128409
可以看到参数量只有三千七百多万,我们这个只能算个小号的对话模型。
下面开始基于数据集构建词表。
三、数据集词表构建
数据集使用对话-百科(中文)训练集,有 274148
条问答对信息,涵盖了 美食、城市、企业家、汽车、明星八卦、生活常识、日常对话
等信息。
数据集下载地址:
https://modelscope.cn/datasets/qiaojiedongfeng/qiaojiedongfeng/summary
数据格式如下所示:
{"question": "你在阐述观点时能否提供一些具体的论据或实例,以便我更容易识别潜在的弱点或反证点?", "answer": "当然可以!当我提出一个观点时,我会举例说明,并引用相关数据、研究或经验来支持我的观点。这样做可以帮助识别可能存在的反对意见或反驳的途径。"}
{"question": "你最近有没有阅读任何书籍?", "answer": "最近我在读《人类简史》,探索历史的演变和人类社会的发展。"}
{"question": "哪种具有特定特性的常见物品经常用于储存和分发水?", "answer": "塑料瓶"}
{"question": "请问北京市的地理覆盖范围是哪些区域?", "answer": "北京市包括东城区、西城区、朝阳区、丰台区、石景山区、海淀区、门头沟区、房山区、通州区、顺义区、昌平区、大兴区、怀柔区、平谷区、密云区和延庆区。"}
{"question": "你能分享一些关于刘德华的有趣故事吗?", "answer": "当然可以!有一次,在拍摄电影《无间道》时,刘德华为了完美呈现角色,长时间沉浸在戏中无法自拔,结果在现实生活中也展现出了一种‘失忆’的状态。他还曾在颁奖典礼上不慎摔伤了手指,但仍然坚持完成表演,这让他获得了'坚强艺人'的称号。"}
{"question": "你能给我一些建议,有哪些美食值得一试?","answer": "尝试日本寿司、意大利披萨、法国鹅肝、泰国绿咖喱和韩国石锅拌饭,每种都有独特的风味,令人回味无穷。"}
{"question": "谁是小说《悲惨世界》的创作者?", "answer": "维克多·雨果"}
构建词表,这里我将一个字作为一个词,也可以优化通过分词器分词后的词构建词表,需要注意的时,词表需要拼接三个特殊Token
,用于表示特殊意义: pad 占位、unk 未知、sep 结束
import json
def build_vocab(file_path):
# 读取所有文本
texts = []
with open(file_path, 'r', encoding='utf-8') as r:
for line in r:
if not line:
continue
line = json.loads(line)
question = line["question"]
answer = line["answer"]
texts.append(question)
texts.append(answer)
# 拆分 Token
words = set()
for t in texts:
if not t:
continue
for word in t.strip():
words.add(word)
words = list(words)
words.sort()
# 特殊Token
# pad 占位、unk 未知、sep 结束
word2id = {"<pad>": 0, "<unk>": 1, "<sep>": 2}
# 构建词表
word2id.update({word: i + len(word2id) for i, word in enumerate(words)})
id2word = list(word2id.keys())
vocab = {"word2id": word2id, "id2word": id2word}
vocab = json.dumps(vocab, ensure_ascii=False)
with open('data/vocab.json', 'w', encoding='utf-8') as w:
w.write(vocab)
print(f"finish. words: {len(id2word)}")
if __name__ == '__main__':
build_vocab("data/train.jsonl")
处理后词表的大小是 4825
,格式如下所示:
下面构建一个 Tokenizer
类,方便后续训练和预测时处理 Token
:
import json
class Tokenizer():
def __init__(self, vocab_path):
with open(vocab_path, "r", encoding="utf-8") as r:
vocab = r.read()
if not vocab:
raise Exception("词表读取为空!")
vocab = json.loads(vocab)
self.word2id = vocab["word2id"]
self.id2word = vocab["id2word"]
self.pad_token = self.word2id["<pad>"]
self.unk_token = self.word2id["<unk>"]
self.sep_token = self.word2id["<sep>"]
def encode(self, text, text1=None, max_length=128, pad_to_max_length=False):
tokens = [self.word2id[word] if word in self.word2id else self.unk_token for word in text]
tokens.append(self.sep_token)
if text1:
tokens.extend([self.word2id[word] if word in self.word2id else self.unk_token for word in text1])
tokens.append(self.sep_token)
att_mask = [1] * len(tokens)
if pad_to_max_length:
if len(tokens) > max_length:
tokens = tokens[0:max_length]
att_mask = att_mask[0:max_length]
elif len(tokens) < max_length:
tokens.extend([self.pad_token] * (max_length - len(tokens)))
att_mask.extend([0] * (max_length - len(att_mask)))
return tokens, att_mask
def decode(self, token):
if type(token) is tuple or type(token) is list:
return [self.id2word[n] for n in token]
else:
return self.id2word[token]
def get_vocab_size(self):
return len(self.id2word)
使用示例:
if __name__ == '__main__':
tokenizer = Tokenizer(vocab_path="data/vocab.json")
encode, att_mask = tokenizer.encode("你好,小毕超", "你好,小毕超", pad_to_max_length=True)
decode = tokenizer.decode(encode)
print("token lens: ", len(encode))
print("encode: ", encode)
print("att_mask: ", att_mask)
print("decode: ", decode)
print("vocab_size", tokenizer.get_vocab_size())
有了词表后,就可以规划训练和验证数据集了,前面构建模型时,我们的参数量只有 三千七百多万,连 0.1 B
都不到,训练这二十七万多条知识,缺失有点牵强,而且还是从零随机初始化参数训练,因此为了快速实验,这里取前 10000
条数据作为训练,1000
条数据验证,从而快速实验效果:
import os.path
def split_dataset(file_path, output_path):
if not os.path.exists(output_path):
os.mkdir(output_path)
datas = []
with open(file_path, "r", encoding='utf-8') as f:
for line in f:
if not line or line == "":
continue
datas.append(line)
train = datas[0:10000]
val = datas[10000:11000]
with open(os.path.join(output_path, "train.json"), "w", encoding="utf-8") as w:
for line in train:
w.write(line)
w.flush()
with open(os.path.join(output_path, "val.json"), "w", encoding="utf-8") as w:
for line in val:
w.write(line)
w.flush()
print("train count: ", len(train))
print("val count: ", len(val))
if __name__ == '__main__':
file_path = "data/train.jsonl"
split_dataset(file_path=file_path, output_path="data")
为了增加自定义模型的特色,这里在训练集中追加几条身份的数据在里面:
{"question": "你是谁", "answer": "我是小毕超,一个简易的小助手"}
{"question": "你叫什么", "answer": "我是小毕超,一个简易的小助手"}
{"question": "你的名字是什么", "answer": "我是小毕超,一个简易的小助手"}
{"question": "你叫啥", "answer": "我是小毕超,一个简易的小助手"}
{"question": "你名字是啥", "answer": "我是小毕超,一个简易的小助手"}
{"question": "你是什么身份", "answer": "我是小毕超,一个简易的小助手"}
{"question": "你的全名是什么", "answer": "我是小毕超,一个简易的小助手"}
{"question": "你自称什么", "answer": "我是小毕超,一个简易的小助手"}
{"question": "你的称号是什么", "answer": "我是小毕超,一个简易的小助手"}
{"question": "你的昵称是什么", "answer": "我是小毕超,一个简易的小助手"}
看一下 train.json
的数据Token
数量分布情况,确定一下 max_token
大小:
import json
from tokenizer import Tokenizer
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei']
def get_num_tokens(file_path, tokenizer):
input_num_tokens = []
with open(file_path, "r", encoding="utf-8") as r:
for line in r:
line = json.loads(line)
question = line["question"]
answer = line["answer"]
tokens, att_mask = tokenizer.encode(question, answer)
input_num_tokens.append(len(tokens))
return input_num_tokens
def count_intervals(num_tokens, interval):
max_value = max(num_tokens)
intervals_count = {}
for lower_bound in range(0, max_value + 1, interval):
upper_bound = lower_bound + interval
count = len([num for num in num_tokens if lower_bound <= num < upper_bound])
intervals_count[f"{lower_bound}-{upper_bound}"] = count
return intervals_count
def main():
train_data_path = "data/train.json"
tokenizer = Tokenizer("data/vocab.json")
input_num_tokens = get_num_tokens(train_data_path, tokenizer)
intervals_count = count_intervals(input_num_tokens, 20)
print(intervals_count)
x = [k for k, v in intervals_count.items()]
y = [v for k, v in intervals_count.items()]
plt.figure(figsize=(8, 6))
bars = plt.bar(x, y)
plt.title('训练集Token分布情况')
plt.ylabel('数量')
plt.xticks(rotation=45)
for bar in bars:
yval = bar.get_height()
plt.text(bar.get_x() + bar.get_width() / 2, yval, int(yval), va='bottom')
plt.show()
if __name__ == '__main__':
main()
可以看出数据集主要分布在120
以内,因此后面训练时,max_length
设为 120
可以覆盖大多数的信息。
四、模型训练
4.1 构建 Dataset
qa_dataset.py
# -*- coding: utf-8 -*-
from torch.utils.data import Dataset
import torch
import json
import numpy as np
class QADataset(Dataset):
def __init__(self, data_path, tokenizer, max_length) -> None:
super().__init__()
self.tokenizer = tokenizer
self.max_length = max_length
self.data = []
if data_path:
with open(data_path, "r", encoding='utf-8') as f:
for line in f:
if not line or line == "":
continue
json_line = json.loads(line)
question = json_line["question"]
answer = json_line["answer"]
self.data.append({
"question": question,
"answer": answer
})
print("data load , size:", len(self.data))
def preprocess(self, question, answer):
encode, att_mask = self.tokenizer.encode(question, answer, max_length=self.max_length, pad_to_max_length=True)
input_ids = encode[:-1]
att_mask = att_mask[:-1]
labels = encode[1:]
return input_ids, att_mask, labels
def __getitem__(self, index):
item_data = self.data[index]
input_ids, att_mask, labels = self.preprocess(**item_data)
return {
"input_ids": torch.LongTensor(np.array(input_ids)),
"attention_mask": torch.LongTensor(np.array(att_mask)),
"labels": torch.LongTensor(np.array(labels))
}
def __len__(self):
return len(self.data)
4.2 训练
# -*- coding: utf-8 -*-
import torch
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from tokenizer import Tokenizer
from model import GPTModel
from qa_dataset import QADataset
from tqdm import tqdm
import time, sys, os
def train_model(model, train_loader, val_loader, optimizer, criterion,
device, num_epochs, model_output_dir, writer):
batch_step = 0
best_val_loss = float('inf')
for epoch in range(num_epochs):
time1 = time.time()
model.train()
for index, data in enumerate(tqdm(train_loader, file=sys.stdout, desc="Train Epoch: " + str(epoch))):
input_ids = data['input_ids'].to(device, dtype=torch.long)
attention_mask = data['attention_mask'].to(device, dtype=torch.long)
labels = data['labels'].to(device, dtype=torch.long)
optimizer.zero_grad()
outputs, dec_self_attns = model(input_ids, attention_mask)
loss = criterion(outputs, labels.view(-1))
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), 1)
optimizer.step()
writer.add_scalar('Loss/train', loss, batch_step)
batch_step += 1
# 100轮打印一次 loss
if index % 100 == 0 or index == len(train_loader) - 1:
time2 = time.time()
tqdm.write(
f"{index}, epoch: {epoch} -loss: {str(loss)} ; lr: {optimizer.param_groups[0]['lr']} ;each step's time spent: {(str(float(time2 - time1) / float(index + 0.0001)))}")
# 验证
model.eval()
val_loss = validate_model(model, criterion, device, val_loader)
writer.add_scalar('Loss/val', val_loss, epoch)
print(f"val loss: {val_loss} , epoch: {epoch}")
# 保存最优模型
if val_loss < best_val_loss:
best_val_loss = val_loss
best_model_path = os.path.join(model_output_dir, "best.pt")
print("Save Best Model To ", best_model_path, ", epoch: ", epoch)
torch.save(model.state_dict(), best_model_path)
# 保存当前模型
last_model_path = os.path.join(model_output_dir, "last.pt")
print("Save Last Model To ", last_model_path, ", epoch: ", epoch)
torch.save(model.state_dict(), last_model_path)
def validate_model(model, criterion, device, val_loader):
running_loss = 0.0
with torch.no_grad():
for _, data in enumerate(tqdm(val_loader, file=sys.stdout, desc="Validation Data")):
input_ids = data['input_ids'].to(device, dtype=torch.long)
attention_mask = data['attention_mask'].to(device, dtype=torch.long)
labels = data['labels'].to(device, dtype=torch.long)
outputs, dec_self_attns = model(input_ids, attention_mask)
loss = criterion(outputs, labels.view(-1))
running_loss += loss.item()
return running_loss / len(val_loader)
def main():
train_json_path = "data/train.json" # 训练集
val_json_path = "data/val.json" # 验证集
vocab_path = "data/vocab.json" # 词表位置
max_length = 120 # 最大长度
epochs = 15 # 迭代周期
batch_size = 128 # 训练一个批次的大小
lr = 1e-4 # 学习率
model_output_dir = "output" # 模型保存目录
logs_dir = "logs" # 日志记录目标
# 设备
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# 加载分词器
tokenizer = Tokenizer(vocab_path)
# 模型参数
model_param = {
"d_model": 768, # 嵌入层大小
"d_ff": 2048, # 前馈神经网络大小
"d_k": 64, # K 的大小
"d_v": 64, # V 的大小
"n_layers": 6, # 解码层的数量
"n_heads": 8, # 多头注意力的头数
"max_pos": 1800, # 位置编码的长度
"device": device, # 设备
"vocab_size": tokenizer.get_vocab_size(), # 词表大小
}
model = GPTModel(**model_param)
print("Start Load Train Data...")
train_params = {
"batch_size": batch_size,
"shuffle": True,
"num_workers": 4,
}
training_set = QADataset(train_json_path, tokenizer, max_length)
training_loader = DataLoader(training_set, **train_params)
print("Start Load Validation Data...")
val_params = {
"batch_size": batch_size,
"shuffle": False,
"num_workers": 4,
}
val_set = QADataset(val_json_path, tokenizer, max_length)
val_loader = DataLoader(val_set, **val_params)
# 日志记录
writer = SummaryWriter(logs_dir)
# 优化器
optimizer = torch.optim.AdamW(params=model.parameters(), lr=lr)
# 损失函数
criterion = torch.nn.CrossEntropyLoss(ignore_index=0).to(device)
model = model.to(device)
# 开始训练
print("Start Training...")
train_model(
model=model,
train_loader=training_loader,
val_loader=val_loader,
optimizer=optimizer,
criterion=criterion,
device=device,
num_epochs=epochs,
model_output_dir=model_output_dir,
writer=writer
)
writer.close()
if __name__ == '__main__':
main()
训练过程:
在 batch size 128
下训练大概仅占用 7G
显存:
训练结果后使用 tensorboard
查看下 loss
趋势:
在训练 15
个epochs
情况下, 训练集 loss
降到1.31
左右,验证集 loss
最低降到了 3.16
左右。
下面对模型预测下对话的效果。
五、模型预测
import torch
from model import GPTModel
from tokenizer import Tokenizer
def generate(model, tokenizer, text, max_length, device):
input, att_mask = tokenizer.encode(text)
input = torch.tensor(input, dtype=torch.long, device=device).unsqueeze(0)
stop = False
input_len = len(input[0])
while not stop:
if len(input[0]) - input_len > max_length:
next_symbol = tokenizer.sep_token
input = torch.cat(
[input.detach(), torch.tensor([[next_symbol]], dtype=input.dtype, device=device)], -1)
break
projected, self_attns = model(input)
prob = projected.squeeze(0).max(dim=-1, keepdim=False)[1]
next_word = prob.data[-1]
next_symbol = next_word
if next_symbol == tokenizer.sep_token:
stop = True
input = torch.cat(
[input.detach(), torch.tensor([[next_symbol]], dtype=input.dtype, device=device)], -1)
decode = tokenizer.decode(input[0].tolist())
decode = decode[len(text):]
return "".join(decode)
def main():
model_path = "output/best.pt"
vocab_path = "data/vocab.json" # 词表位置
max_length = 128 # 最大长度
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# 加载分词器
tokenizer = Tokenizer(vocab_path)
# 模型参数
model_param = {
"d_model": 768, # 嵌入层大小
"d_ff": 2048, # 前馈神经网络大小
"d_k": 64, # K 的大小
"d_v": 64, # V 的大小
"n_layers": 6, # 解码层的数量
"n_heads": 8, # 多头注意力的头数
"max_pos": 1800, # 位置编码的长度
"device": device, # 设备
"vocab_size": tokenizer.get_vocab_size(), # 词表大小
}
model = GPTModel(**model_param)
model.load_state_dict(torch.load(model_path))
model.to(device)
while True:
text = input("请输入:")
if not text:
continue
if text == "q":
break
res = generate(model, tokenizer, text, max_length, device)
print("AI: ", res)
if __name__ == '__main__':
main()
预测效果: