参考如何根据自己的数据集微调一个 Transformer 模型
我们将通过NLP中最常见的文本分类任务来学习如何在自己的数据集上利用迁移学习(transfer learning)微调一个预训练的Transformer模型——DistilBERT。DistilBERT是BERT的一个衍生版本,它的优点在它的性能与BERT相当,但是体积更小、更高效。所以我们可以在几分钟内训练一个文本分类器。
1 数据集
这里我们将使用英文推文情感数据集,这个数据集中包含了:anger,disgust,fear,joy,sadness和surprise六种情感类别。所以我们的任务是给定一段推文,训练一个可以将其分类成这六种基本情感的其中之一的模型。
现在我们来从Huggingface国内镜像下载数据集。
为了更好地分析数据,我们可以将 Dataset 对象转成 Pandas DataFrame,然后就可以利用各种高级 API 可视化数据集了。
import json
import pandas as pd
filename = "train.jsonl"
with open(filename,'r',encoding='utf-8') as fr:
data_list = fr.readlines()
text_list =[]
label_list = []
for data in data_list:
data_dict = json.loads(data.strip('\n'))
text_list.append(data_dict['text'])
label_list.append(data_dict['label'])
df = pd.DataFrame({"text":text_list,"label":label_list})
# 情感映射
emotion_map = {0:'sadness',1:'joy',2:'love',3:'anger',4:'fear',5:'surprise'}
df['label_name'] = df['label'].map(emotion_map)
1.1 类别分布是否均衡
处理任何分类任务之前,都要看一下样本的类别分布是否均衡,不均衡类别分布的数据集在训练损失和评估指标方面可能需要与平衡数据集做不同的处理。
import matplotlib.pyplot as plt
df['label_name'].value_counts(ascending=True).plot.barh()
plt.title('Frequency of Classes')
plt.show()
类别分布严重不均衡!joy和sadness类样本数量最多,而love和surprise类的样本数量几乎要少5-10倍。
有好几种方法可以处理类别不均衡问题:
(1)对样本数量少的类别进行随机上采样。
(2)对样本数量少的类别进行随机下采样。
(3)对于样本数量不足的类别收集更多样本。
这里不做任何处理。
1.2 上下文token长度
无论是哪个Transformer模型,它都有上下文长度限制(maximum context size)。GPT-4 Turbo的上下文长度已经达到了128k个token!不过DistilBERT 只有512个。
token指的是不能再被拆分的文本原子,这里就简单理解为单词就好。
df['Words Per Tweet'] = df['text'].str.split().apply(len)
df.boxplot('Words Per Tweet', by='label_name')
plt.show()
从上图可以看到最长的推文长度也没超过512,大多数长度在15左右,完全符合 DistilBERT的要求。比模型最长上下文限制还要长的文本需要被截断,如果截断的文本包含关键信息,这可能会导致性能损失,不过这里没有这个问题。
2 Token分词
像DistilBERT这样的Transformer模型无法接受原始的字符串作为输入,我们必须将文本拆分成一个个token(这一过程称为tokenized),然后编码成数值向量表示。
将文本拆分成模型可用的原子单位的步骤称为tokenization。对于英文来说有 character tokenization和word tokenization。我们这里简单地见识一下,不深入探讨。
2.1 character tokenization按字符拆分
以英文为例,对于character tokenization来说。
(1)将原始文本拆分成一个个字符,也就是26个大小写字母加标点符号。
(2)建立一个字符到唯一整数映射的映射关系表。
(3)将字符映射到唯一的整数表示input_ids。
(4)将input_ids转成2D的one-hot encoding向量。
前三步,构成词汇表,并根据映射向量化表示。
# 原始文本
text = 'Tokenizing text is a core task of NLP.'
# 将原始文本拆分成字符
tokenized_text = list(text)
print(len(tokenized_text),tokenized_text) # 如38 ['T', 'o', 'k', 'e', 'n']
# 字符到整数的映射关系
token2idx = {ch:idx for idx,ch in enumerate(sorted(set(tokenized_text)))}
print(len(token2idx),token2idx) # 如20 {' ': 0, 'L': 1, 'N': 2, 'P': 3, 'T': 4}
# 字符映射到唯一整数
input_ids = [token2idx[token] for token in tokenized_text]
print(len(input_ids),input_ids) # 如38 [4, 13, 11, 7, 12]
最后一步,将向量化表示,转化成2D的稀疏张量
# 2D one-hot向量
import torch
import torch.nn.functional as F
input_ids = torch.tensor(input_ids)
one_hot_encodings = F.one_hot(input_ids,num_classes=len(token2idx))
print(one_hot_encodings.shape) # torch.Size([38, 20])
one_hot_encodings
因为有38个字符,所以对应38行。每一个字符由一个长为20的稀疏张量表示。
character-level tokenization忽略了文本的结构,将字符串看成是一连串的字符流,尽管这种方法可以处理拼写错误和罕见的单词。其主要缺点是需要从数据中学习单词等语言结构。这需要大量的计算、内存和数据。因此,这种方法在实践中很少使用。
2.2 word tokenization按单词拆分
word tokenization就是按照单词维度来拆分文本。
前三步,构成词汇表,并根据映射向量化表示。
# 原始文本
text = 'Tokenizing text is a core task of NLP.'
# 将原始文本拆分成单词
tokenized_text = text.split()
print(len(tokenized_text),tokenized_text) # 如8 ['Tokenizing', 'text', 'is']
# 单词到整数的映射关系
token2idx = {ch:idx for idx,ch in enumerate(sorted(set(tokenized_text)))}
print(len(token2idx),token2idx) # 如8 {'NLP.': 0, 'Tokenizing': 1, 'a': 2}
# 单词映射到唯一整数
input_ids = [token2idx[token] for token in tokenized_text]
print(len(input_ids),input_ids) # 如8 [1, 7, 4, 2, 3, 6, 5, 0]
最后一步,将向量化表示,转化成2D的稀疏张量
# 2D one-hot向量
import torch
import torch.nn.functional as F
input_ids = torch.tensor(input_ids)
one_hot_encodings = F.one_hot(input_ids,num_classes=len(token2idx))
print(one_hot_encodings.shape) # torch.Size([8, 8])
one_hot_encodings
因为有8个词,所以对应8行。每一个词由一个长为8的稀疏张量表示。
不过character tokenization的词汇表最多只有几百个(对英文来说,26 个大小写字母和标点符号)。但是word tokenziation形成的词汇表可能有数千甚至数万之多,尤其是英文这种每个单词还有不同的形式变化的语言。
subword tokenization可以看成是character tokenization和word tokenization的折中方法。NLP中有不少算法可以实现subword tokenization,BERT和 DistilBERT都是采用WordPiece算法。
2.3 下载模型预训练过的tokenizer
一、每个模型都有自己tokenization方法,所以要从对应模型的checkpoint下载预训练过的tokenizer。
有时需要指定最大上下文长度model_max_length。
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained('D:\distilbert-base-uncasedmodel')
# 查询基本的rokenizer信息
print("词汇表尺寸",tokenizer.vocab_size)
print("最大上下文长度",tokenizer.model_max_length)
print("模型期望接收的输入字段",tokenizer.model_input_names)
二、tokens和ids可以互相转换
两个##号表示这个token和前面的token组合到一起。
# 原始文本
text = 'Tokenizing text is a core task of NLP.'
#所有字母变小写,tokenizing和nlp被拆分,##表示这个token和前面的token组合到一起
tokens = tokenizer.tokenize(text)
print(f"tokens:{tokens}")
input_ids = tokenizer.convert_tokens_to_ids(tokens)
print(f"input_ids:{input_ids}")
tokens = tokenizer.convert_ids_to_tokens(input_ids)
print(f"tokens:{tokens}")
三、模型期望接收的输入字段
[‘input_ids’, ‘attention_mask’]。
# 原始文本
text = 'Tokenizing text is a core task of NLP.'
encoded_text = tokenizer(text)
print(f"encoded_text:{encoded_text}")
tokens2ids = list(zip(tokenizer.all_special_tokens,tokenizer.all_special_ids))
data = sorted(tokens2ids,key=lambda x:x[-1])
df =pd.DataFrame(data)
df.columns = ['Special Token','Special Token ID']
df.T
首先input_ids字段还是token对应的整数,但是首尾增加了标识序列开头和结尾的特殊 token:[CLS] 和 [SEP]。
现在再来看看attention_mask字段。当批量处理文本时,每个文本的长度都不一样。
如果最长的文本超过模型的最长上下文限制,则直接截断多余的部分。
在其余短文本后面附加padding token,使它们的长度都一致。
attention_mask为0的部分表示对应的token是为了扩展长度而引入的padding token,模型无需理会。
3 模型架构
像DistilBERT这样的模型的预训练目标是预测文本序列中的mask词,所以我们并不能直接拿来做文本分类任务。像DistilBERT这种encoder-based Transformer模型架构通常由一个预训练的body和对应分类任务的head组成。
(1)首先我们将文本进行tokenization处理,形成称为token encodings的one-hot向量。tokenizer词汇表的大小决定了token encodings的维度,通常在20k-30k。
(2)然后,token encodings被转成更低维度的token embeddings向量,比如 768维,在embedding空间中,意思相近的token的embedding向量表示的距离也会更相近。
(3)然后token embeddings经过一系列的encoder层,最终每个token都生成了一个hidden state。
现在我们有两种选择:
3.1 方案一【特征抽取器】
将Transformer模型视为特征抽取模型,我们不改变原模型的权重,仅仅将 hidden state作为每个文本的特征,然后训练一个分类模型,比如逻辑回归。
所以我们需要在训练时冻结body部分的权重,仅更新head的权重。
这样做的好处是即使 GPU 不可用时我们也可以快速训练一个小模型。
让我们先下载模型。
3.1.1 加载预训练模型
一、加载模型
from transformers import AutoModel
model_ckpt = 'D:\distilbert-base-uncasedmodel'
device = torch.device("cuda" if torch.cuda.is_available() else 'cpu')
model = AutoModel.from_pretrained(model_ckpt).to(device)
这个模型就会将token encoding转成embedding,再经过若干encoder层输出 hidden state。
二、应用举例
text = 'this is a test'
inputs = tokenizer(text,return_tensors='pt')
print(f'inputs={inputs}')
print(f"token={tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])}")
inputs = {k:v.to(device) for k,v in inputs.items()}
with torch.no_grad():
outputs = model(**inputs)
print(outputs)
print(outputs.last_hidden_state.shape) # torch.Size([1, 6, 768])
print(outputs.last_hidden_state[:,0].shape) # torch.Size([1, 768])
3.1.2 抽取特征
在分类任务中,习惯用[CLS] token对应的hidden state作为句子特征,所以我们先写一个特征抽取函数。
def extract_hidden_states(batch):
# 将model inputs放入GPU
inputs = {k:v.to(device) for k,v in batch.items()
if k in tokenizer.model_input_names}
# 抽取last hidden states
with torch.no_grad():
last_hidden_state = model(**inputs).last_hidden_state
# 返回vector for [CLS] token
return last_hidden_state[:,0].numpy()
def convert_fun(text):
inputs = tokenizer(text,return_tensors='pt')
hidden_state = extract_hidden_states(inputs)
return hidden_state
抽取数据集特征
import json
import pandas as pd
import numpy as np
filename = "train.jsonl"
with open(filename,'r',encoding='utf-8') as fr:
data_list = fr.readlines()
text_list =[]
label_list = []
for data in data_list:
data_dict = json.loads(data.strip('\n'))
text_list.append(data_dict['text'])
label_list.append(data_dict['label'])
df = pd.DataFrame({"text":text_list,"label":label_list})
emotion_map = {0:'sadness',1:'joy',2:'love',3:'anger',4:'fear',5:'surprise'}
df['label_name'] = df['label'].map(emotion_map)
df["hidden"] = df['text'].apply(convert_fun) # 二维
df['hidden'] = df['hidden'].map(lambda x:np.array(x[0])) # 一维
train_hidden = df['hidden'].values # (16000,)
train_labels = df['label'].values
X_train = [hidden for hidden in train_hidden]
X_train = np.array(X_train) # (16000, 768)
y_train = train_labels
3.1.3 训练逻辑回归模型
训练一个逻辑回归模型去预测推文情绪类别。
from sklearn.linear_model import LogisticRegression
lr_clf = LogisticRegression(max_iter=3000)
lr_clf.fit(X_train,y_train)
print('模型的平均正确率为{}'.format(lr_clf.score(X_train,y_train)))
模型的平均正确率为0.6939375,效果并不是很好。
from sklearn.metrics import ConfusionMatrixDisplay,confusion_matrix
def plot_confusion_matrix(y_preds,y_true,labels):
cm = confusion_matrix(y_true,y_preds,normalize='true')
fig,ax = plt.subplots(figsize=(6,6))
disp = ConfusionMatrixDisplay(confusion_matrix=cm,display_labels=labels)
disp.plot(cmap='Blues',values_format='.2f',ax =ax,colorbar=False)
plt.title('Normalized confusion matrix')
plt.show()
labels = ['sadness','joy','love','anger','fear','surprise']
y_preds = lr_clf.predict(X_train)
plot_confusion_matrix(y_preds,y_train,labels)
从混淆矩阵可以看到anger和fear通常会被误分类成sadness,love和surprise 也总会被误分类成joy。
3.2 方案二【微调Transformer模型】
此时我们不再将预训练的Transformer模型当作特征抽取器了,我们也不会将 hidden state作为固定的特征了,我们会从头训练整个整个Transformer模型,也就是会更新预训练模型的权重。
如下图所示,此时head部分要可导了,不能使用逻辑回归这样的机器学习算法了,我们可以使用神经网络。
3.2.1 构建数据加载器
(1)定义一个取数据的函数
import json
import pandas as pd
def get_origin_data(filename):
with open(filename,'r',encoding='utf-8') as fr:
data_list = fr.readlines()
text_list =[]
label_list = []
for data in data_list:
data_dict = json.loads(data.strip('\n'))
text_list.append(data_dict['text'])
label_list.append(data_dict['label'])
df = pd.DataFrame({"text":text_list,"label":label_list})
emotion_map = {0:'sadness',1:'joy',2:'love',3:'anger',4:'fear',5:'surprise'}
df['label_name'] = df['label'].map(emotion_map)
return df
(2)获取训练集、测试集、验证集
filename_train = "train.jsonl"
filename_test = "test.jsonl"
filename_validation = "validation.jsonl"
df_train = get_origin_data(filename_train)
df_test = get_origin_data(filename_test)
df_validation = get_origin_data(filename_validation)
train_texts = df_train['text'].values
train_labels = df_train['label'].values
test_texts = df_train['text'].values
test_labels = df_train['label'].values
validation_texts = df_train['text'].values
validation_labels = df_train['label'].values
(3)分词
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained('D:\distilbert-base-uncasedmodel',model_max_length=512)
train_encodings = tokenizer(list(train_texts), truncation=True, padding=True)
test_encodings = tokenizer(list(test_texts), truncation=True, padding=True)
validation_encodings = tokenizer(list(validation_texts), truncation=True, padding=True)
(4)构建数据集,可以按索引选取指定数量的样本
import torch
class EMOTIONDataset(torch.utils.data.Dataset):
def __init__(self, encodings, labels):
self.encodings = encodings
self.labels = labels
def __getitem__(self, idx):
item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
item['labels'] = torch.tensor(self.labels[idx])
return item
def __len__(self):
return len(self.labels)
train_dataset = EMOTIONDataset(train_encodings, train_labels)
test_dataset = EMOTIONDataset(test_encodings, test_labels)
validation_dataset = EMOTIONDataset(validation_encodings, validation_labels)
3.2.2 加载预训练模型
pip install accelerate
(1)首先我们加载预训练模型,从下方的警告信息可以看到此时模型一部分参数是随机初始化的。
指定模型的分类数量num_labels。
指定优化器。
from transformers import AutoModelForSequenceClassification
torch.backends.cudnn.deterministic = True
RANDOM_SEED = 123
torch.manual_seed(RANDOM_SEED)
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
NUM_EPOCHS = 1
num_labels = 6
model = AutoModelForSequenceClassification.from_pretrained('D:\distilbert-base-uncasedmodel',
num_labels=num_labels)
model.to(DEVICE)
model.train();
optim = torch.optim.Adam(model.parameters(), lr=5e-5)
(2)定义准确率作为微调模型时的性能衡量指标。
from sklearn.metrics import accuracy_score
def compute_metrics(pred):
labels = pred.label_ids
preds = pred.predictions.argmax(-1)
acc = accuracy_score(labels, preds)
return {"accuracy": acc}
(3)定义一些训练模型时的超参数设定。
from transformers import Trainer, TrainingArguments
training_args = TrainingArguments(
output_dir='./results',
num_train_epochs=1,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
logging_dir='./logs',
logging_steps=16
)
trainer = Trainer(
model=model,
compute_metrics=compute_metrics,
args=training_args,
train_dataset=train_dataset,
eval_dataset=validation_dataset,
optimizers=(optim, None) # optimizer and learning rate scheduler
)
(4)全部就绪后,就可以训练模型了,我们这里训练1个 epoch。
import time
start_time = time.time()
trainer.train()
print(f'Total Training Time: {(time.time() - start_time)/60:.2f} min')
已经使用的时间<还需要多少时间
3.2.3 查看验证集上的混淆矩阵
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay,confusion_matrix
def plot_confusion_matrix(y_preds,y_true,labels):
cm = confusion_matrix(y_true,y_preds,normalize='true')
fig,ax = plt.subplots(figsize=(6,6))
disp = ConfusionMatrixDisplay(confusion_matrix=cm,display_labels=labels)
disp.plot(cmap='Blues',values_format='.2f',ax =ax,colorbar=False)
plt.title('Normalized confusion matrix')
plt.show()
labels = ['sadness','joy','love','anger','fear','surprise']
preds_output = trainer.predict(validation_dataset)
y_preds = np.argmax(preds_output.predictions,axis=1)
plot_confusion_matrix(y_preds,validation_labels,labels)
可以看到此时的混淆矩阵已经十分接近对角矩阵了,比之前的好多了。
3.2.4 应用示例
emotion_map = {0:'sadness',1:'joy',2:'love',3:'anger',4:'fear',5:'surprise'}
custom_tweet = "I saw a movie today and it was really surprise"
custom_encodings = tokenizer([custom_tweet], truncation=True, padding=True)
custom_dataset = EMOTIONDataset(custom_encodings,labels=[[0]])
preds = trainer.predict(custom_dataset).predictions
print("preds=",preds)
idx_max = np.argmax(preds,axis=1)[0] # 最大值对应的索引
print("predict_label=",emotion_map[idx_max])
输出如下:
preds= [[-2.0667038 2.0326774 -1.4972531 -1.8979816 -0.48651505 2.1575205 ]]
predict_label= surprise
4 总结
要使用Transformers中的Trainer训练自定义的BERT下游模型,并进行评估,需要进行以下步骤:
(1)准备数据集:将原始数据集转换为适合 BERT 模型训练的格式,例如使用 tokenizer对文本进行编码,将标签转换为数字等。
(2)定义模型:定义一个自定义的BERT模型,并根据任务类型添加相应的输出层。
(3)定义数据加载器:使用Dataset和DataLoader对数据集进行加载和处理,以便在训练期间以批量的方式输入模型。
(4)定义训练参数:设置训练参数,例如批量大小、学习率、训练周期等。
(5)定义评估指标:选择适合任务的评估指标,例如准确率、F1 值等。
(6)创建 Trainer 对象:使用自定义的模型、数据加载器、训练参数和评估指标创建 Trainer 对象。
(7)训练模型:使用Trainer.train()函数训练模型。在训练过程中,Trainer会自动调整学习率、记录日志、保存模型等。
(8)评估模型:使用Trainer.evaluate()函数评估模型性能。该函数会计算模型在给定数据集上的评估指标。
from transformers import Trainer, TrainingArguments
import torch
# 准备数据集
train_dataset = ...
eval_dataset = ...
# 定义模型
model = ...
# 定义数据加载器
train_loader = ...
eval_loader = ...
# 定义训练参数
training_args = TrainingArguments(
output_dir='./results', # 输出目录
num_train_epochs=3, # 训练周期数
per_device_train_batch_size=16, # 批量大小
per_device_eval_batch_size=64, # 验证批量大小
warmup_steps=500, # warmup 步骤数
weight_decay=0.01, # 权重衰减
logging_dir='./logs', # 日志目录
logging_steps=10,
evaluation_strategy='steps',
eval_steps=50,
save_strategy='epoch',
save_steps=1000,
)
# 定义评估指标
def compute_metrics(pred):
labels = pred.label_ids
preds = pred.predictions.argmax(-1)
acc = torch.sum(preds == labels) / len(labels)
return {'accuracy': acc}
# 创建 Trainer 对象
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
data_collator=data_collator,
compute_metrics=compute_metrics,
)
# 训练模型
trainer.train()
# 评估模型
trainer.evaluate()
在上面的示例中,compute_metrics() 函数计算模型在验证集上的准确率。如果想要对模型进行推理,可以使用 Trainer 的 predict() 方法。例如:
predictions = trainer.predict(test_dataset)