文章目录
- 一、GPT2实现步骤
- 二、源码
一、GPT2实现步骤
机器学习模型的开发实现步骤一般都包含以下几个部分:
1. 遵照模型的网络架构,实现每一层(Layer/Block)的函数;
2. 将第1步中的函数组合在一起,形成完整的Model;
3. 定义模型的单步训练(train_step)函数,损失计算函数,优化器,metric函数(准确率度量函数)等,以完成单步的模型训练;
4. 定义循环训练函数(train_loop),循环调用第3步的函数,完成多轮次(epoch),多批次(batch)的训练;
5. 定义评估函数(evaluate),测试并评估模型训练结果是否符合预期;
6. 构造训练数据,包括train、validation、test等,输入到模型并开始训练;
二、源码
GPT网络结构如下图:
上图的详细分析参照之前的文章(https://blog.csdn.net/liuqiker/article/details/130782918?spm=1001.2014.3001.5501),此处不再赘述。
源码如下:
导入包
import tensorflow as tf
import tensorflow.keras as keras
import matplotlib.pyplot as plt
import numpy as np
import urllib.request
import zipfile
from IPython import display
import time
位置编码函数
def positional_encoding(length, depth):
depth = depth/2
positions = np.arange(length)[:, np.newaxis] # (seq, 1)
depths = np.arange(depth)[np.newaxis, :]/depth # (1, depth)
angle_rates = 1 / (10000**depths) # (1, depth)
angle_rads = positions * angle_rates # (pos, depth)
pos_encoding = np.concatenate(
[np.sin(angle_rads), np.cos(angle_rads)],
axis=-1)
return tf.cast(pos_encoding, dtype=tf.float32)
定义PositionalEmbedding层
class PositionalEmbedding(tf.keras.layers.Layer):
def __init__(self, vocab_size, d_model, max_seq_len=100, dropout_rate=0.1):
super().__init__()
self.d_model = d_model
# vocab_size is the size of whole vocab, input length should not be longger than vocab_size
self.embedding = tf.keras.layers.Embedding(vocab_size, d_model, mask_zero=True)
self.pos_encoding = positional_encoding(length=max_seq_len, depth=d_model)
self.dropout = tf.keras.layers.Dropout(dropout_rate)
def compute_mask(self, *args, **kwargs):
return self.embedding.compute_mask(*args, **kwargs)
def call(self, x):
length = tf.shape(x)[1] # x.shape is [None, length]
x = self.embedding(x)
# This factor sets the relative scale of the embedding and positonal_encoding.
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
x = x + self.pos_encoding[tf.newaxis, :length, :]
x = self.dropout(x)
return x
定义Multi-Head Attention Layer + Add & Norm层
class CausalSelfAttention(tf.keras.layers.Layer):
def __init__(self, **kwargs):
super().__init__()
self.mha = tf.keras.layers.MultiHeadAttention(**kwargs)
self.layernorm = tf.keras.layers.LayerNormalization()
self.add = tf.keras.layers.Add()
def call(self, x):
attn_output = self.mha(
query=x,
value=x,
key=x,
use_causal_mask = True)
x = self.add([x, attn_output]) # 残差
x = self.layernorm(x)
return x
定义Feed Forward Block + Add & Norm层
class FeedForward(tf.keras.layers.Layer):
# dff : depth of feed-forward layer
def __init__(self, d_model, dff, dropout_rate=0.1):
super().__init__()
self.seq = tf.keras.Sequential([
tf.keras.layers.Dense(dff, activation='relu'),
tf.keras.layers.Dense(d_model),
tf.keras.layers.Dropout(dropout_rate)
])
self.add = tf.keras.layers.Add()
self.layer_norm = tf.keras.layers.LayerNormalization()
def call(self, x):
x = self.add([x, self.seq(x)]) # 残差
x = self.layer_norm(x)
return x
定义DecoderBlock
class DecoderBlock(tf.keras.layers.Layer):
def __init__(self,*, d_model, num_heads, dff, dropout_rate=0.1):
super().__init__()
self.self_attention = CausalSelfAttention(
num_heads=num_heads,
key_dim=d_model,
dropout=dropout_rate)
self.ffn = FeedForward(d_model, dff) # depth of feed-forward layer
def call(self, x):
x = self.self_attention(x)
x = self.ffn(x)
return x
定义Decoder
class Decoder(tf.keras.layers.Layer):
def __init__(self, num_layers, d_model, num_heads,
dff, vocab_size, max_seq_len=100, dropout_rate=0.1):
super().__init__()
self.d_model = d_model
self.num_layers = num_layers
self.pos_embedding = PositionalEmbedding(
vocab_size=vocab_size, d_model=d_model, max_seq_len=max_seq_len)
self.dec_layers = [
DecoderBlock(d_model=d_model,
num_heads=num_heads,
dff=dff, # depth of feed-forward layer
dropout_rate=dropout_rate)
for _ in range(num_layers)]
self.dropout = tf.keras.layers.Dropout(dropout_rate)
def call(self, x):
x = self.pos_embedding(x) # Shape `(batch_size, seq_len, d_model)`.
# Add dropout.
x = self.dropout(x)
for i in range(self.num_layers):
x = self.dec_layers[i](x)
return x # Shape `(batch_size, seq_len, d_model)`.
创建GPT模型
class GPT(tf.keras.Model):
def __init__(self, num_layers, d_model, num_heads, dff,
vocab_size,
max_seq_len,
fine_tuning_class_num,
dropout_rate=0.1):
super().__init__()
self.decoder = Decoder(num_layers, d_model, num_heads, dff, vocab_size, max_seq_len)
self.final_layer = tf.keras.layers.Dense(vocab_size)
self.fine_tuning_layer = tf.keras.layers.Dense(fine_tuning_class_num)
def call(self, targets):
decode_out = self.decoder(targets)
final_out = self.final_layer(decode_out)
# fine_tuning_out = self.fine_tuning_layer(tf.keras.layers.Flatten()(final_out)) 对于GPT2,不需要fine_tune输出,GPT1需要
return final_out
定义train_step、loss函数、optimizer
num_layers = 4
d_model = 128
dff = num_layers * d_model
num_heads = 8
target_vocab_size = tokenizer_title.vocab_size + 2
max_seq_len = MAX_LENGTH
dropout_rate = 0.1
# 自定义learning_rate,来自于https://www.tensorflow.org/text/tutorials/transformer
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, d_model, warmup_steps=4000):
super().__init__()
self.d_model = d_model
self.d_model = tf.cast(self.d_model, tf.float32)
self.warmup_steps = warmup_steps
def __call__(self, step):
step = tf.cast(step, dtype=tf.float32)
arg1 = tf.math.rsqrt(step)
arg2 = step * (self.warmup_steps ** -1.5)
return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
# 定义优化器
learning_rate = CustomSchedule(d_model)
optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
# 定义目标函数和评估指标,from_logits=True代表先做softmax再计算
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
# 损失函数
def loss_fun(y_ture, y_pred):
mask = tf.math.logical_not(tf.math.equal(y_ture, 0)) # 为0掩码标1
loss_ = loss_object(y_ture, y_pred)
mask = tf.cast(mask, dtype=loss_.dtype)
loss_ *= mask
return tf.reduce_mean(loss_)
# 初始化模型
gpt2 = GPT(num_layers, d_model, num_heads, dff,
target_vocab_size,
max_seq_len,
dropout_rate)
checkpoint_path = '/usr/data/checkpoint/train_gpt2_exp1'
ckpt = tf.train.Checkpoint(gpt2=gpt2,
optimizer=optimizer)
# ckpt管理器
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=3)
if ckpt_manager.latest_checkpoint:
ckpt.restore(ckpt_manager.latest_checkpoint)
print('last checkpoit restore')
def train_step(targets):
tar_inp = targets[:, :-1]
tar_real = targets[:, 1:]
with tf.GradientTape() as tape:
predictions = gpt2(tar_inp)
loss = loss_fun(tar_real, predictions)
# 求梯度
gradients = tape.gradient(loss, gpt2.trainable_variables)
# 反向传播
optimizer.apply_gradients(zip(gradients, gpt2.trainable_variables))
# 记录loss和准确率
train_loss(loss)
train_accuracy(tar_real, predictions)
定义train_loop
EPOCHS = 20
step_list = []
loss_list = []
step = 0
for epoch in range(EPOCHS):
start = time.time()
# 重置记录项
train_loss.reset_states()
train_accuracy.reset_states()
for batch, all_inputs in enumerate(train_dataset):
# 训练
train_step(all_inputs)
if batch % 100 == 0:
loss = train_loss.result()
print('epoch {}, batch {}, loss:{:.4f}, acc:{:.4f}'.format(
epoch+1, batch, loss, train_accuracy.result()
))
step_list.append(step)
loss_list.append(loss)
step += 1
if (epoch + 1) % 2 == 0:
ckpt_save_path = ckpt_manager.save()
print('epoch {}, save model at {}'.format(
epoch+1, ckpt_save_path
))
print('epoch {}, loss:{:.4f}, acc:{:.4f}'.format(
epoch+1, train_loss.result(), train_accuracy.result()
))
print('time in 1 epoch:{} secs\n'.format(time.time()-start))
plt.plot(step_list, loss_list)
plt.xlabel('train step')
plt.ylabel('loss')
定义评估输出函数
def evaluate(inp_sentence):
start_token = [tokenizer_title.vocab_size]
end_token = [tokenizer_title.vocab_size + 1]
# 增加开始和结束标记
inp_sentence = start_token + tokenizer_title.encode(inp_sentence) + end_token
encoder_input = tf.expand_dims(inp_sentence, 0)
decoder_input = [tokenizer_title.vocab_size]
output = tf.expand_dims(decoder_input, 0)
for i in range(MAX_LENGTH):
predictions = gpt2(encoder_input)
# 从 seq_len 维度选择最后一个词(选择最优解,如果需要生成内容更随机,可以修改此处的选择逻辑)
predictions = predictions[: ,-1:, :] # (batch_size, 1, vocab_size)
predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)
# 如果 predicted_id 等于结束标记,就返回结果
if predicted_id == tokenizer_title.vocab_size + 1:
return tf.squeeze(encoder_input, axis=0)
# 连接 predicted_id 与输出,作为解码器的输入传递到解码器。
encoder_input = tf.concat([encoder_input, predicted_id], axis=-1)
output = tf.concat([output, predicted_id], axis=-1)
return tf.squeeze(encoder_input, axis=0)
def translate(sentence, plot=''):
result = evaluate(sentence)
predicted_sentence = tokenizer_title.decode([i for i in result if i < tokenizer_title.vocab_size])
predicted_sentence = predicted_sentence.replace(" ", "")
sentence = sentence.replace(" ", "")
print('输入: {}'.format(sentence))
print('预测输出: {}'.format(predicted_sentence))
至此GPT模型已经构建完毕,只需要自行下载训练数据,并做好相应的分词(使用jieba分词)和token处理(使用tfds.features.text.Tokenizer)即可开始模型的训练了!