Transformers实战(一)快速入门命名实体识别、多项选择
1、命名实体识别(NER)
1.1 命名实体识别简介
1.1.1 NER数据标注体系
命名实体识别
(Named Entity Recognition,简称NER)是指识别文本中具有特定意义的实体,主要包括人名、地名、机构名、专有名词等。
通常包括两部分:
(1)实体边界识别
(2)确定实体类别 (人名、地名、机构名或其他)。
例如:”小明在北京上班“
实体类别 | 实体 |
---|---|
地点LOC | 北京 |
人物PER | 小明 |
命名实体识别数据标注体系有很多,比如IOB1、IOB2、IOE1、IOE2、IOBES等。
先介绍下IOB2标注体系:
I表示实体内部、O表示实体外部、B表示实体开始
B/I-XXX,XXX表示具体的类别。
例如:
# IOB2标注列表如下:
['O', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC']
O表示实体外部
B-PER表示人名开始
I-PER表示人名中间
B-ORG表示组织名开始
I-ORG表示组织名中间
B-LOC表示地名开始
I-LOC表示地名开始
# 标注好的数据,实际是上述列表中元素所在的位置
# 例如,【厦门】用【5,6】表示【B-LOC,I-LOC】
下面是一个示例:
['海', '钓', '比', '赛', '地', '点', '在', '厦', '门', '与', '金', '门', '之', '间', '的', '海', '域', '。']
[ 0, 0, 0, 0, 0, 0, 0, 5, 6, 0, 5, 6, 0, 0, 0, 0, 0, 0]
再介绍一种IOBES标注体系:
-
I表示实体内部,O表示实体外部,B表示实体开始,E表示实体结束,S表示一个词单独形成一个命名实体
-
有时也会使用M代替I,但本质是同一含义
1.1.2 NER评估指标
我们可以用Precision、Recall、F1值来进行评估。
举例如下:
1.1.3 NER所用模型
命名实体识别任务用到的模型是AutoModelForTokenClassification,即对每个token进行分类;
此外,关于Padding文本分类任务使用的是DataCollatorWithPadding,这里使用的是DataCollatorForTokenClassification,这个类是专门针对Token分类任务的。
我们可以在transformers的BertForTokenClassification中看到模型的源码:
class BertForTokenClassification(BertPreTrainedModel):
def __init__(self, config):
super().__init__(config)
self.num_labels = config.num_labels
self.bert = BertModel(config, add_pooling_layer=False)
classifier_dropout = (
config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
)
self.dropout = nn.Dropout(classifier_dropout)
# num_labels类
self.classifier = nn.Linear(config.hidden_size, config.num_labels)
# Initialize weights and apply final processing
self.post_init()
@add_start_docstrings_to_model_forward(BERT_INPUTS_DOCSTRING.format("batch_size, sequence_length"))
def forward(
self,
input_ids: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
token_type_ids: Optional[torch.Tensor] = None,
position_ids: Optional[torch.Tensor] = None,
head_mask: Optional[torch.Tensor] = None,
inputs_embeds: Optional[torch.Tensor] = None,
labels: Optional[torch.Tensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple[torch.Tensor], TokenClassifierOutput]:
r"""
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Labels for computing the token classification loss. Indices should be in `[0, ..., config.num_labels - 1]`.
"""
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# 先经过bert
outputs = self.bert(
input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
# sequence_output的shape为[batch_size, seq_len, hidden_size]
# 其中,hidden_size=768
# 将【last_hidden_state】输入到全连接层进行分类
sequence_output = outputs[0]
sequence_output = self.dropout(sequence_output)
# logits的shape为[batch_size, seq_len, num_labels]
logits = self.classifier(sequence_output)
loss = None
if labels is not None:
# 计算交叉熵损失
loss_fct = CrossEntropyLoss()
loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
if not return_dict:
output = (logits,) + outputs[2:]
return ((loss,) + output) if loss is not None else output
return TokenClassifierOutput(
loss=loss, # 训练最后一个epoch后的损失
logits=logits, # 每个token属于num_labels每一项的概率
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
1.2 基于transformers进行命名实体识别
1.2.1 加载数据集
模型使用哈工大开源的chinese-macbert-base
这次使用的数据集是人民日报数据集,任务是命名实体识别,该数据集可以在datasets官网中找到,名称是peoples_daily_ner
import evaluate
from transformers import AutoTokenizer, AutoModelForTokenClassification
from transformers import TrainingArguments, Trainer, DataCollatorForTokenClassification
import warnings
warnings.filterwarnings('ignore')
# 如果可以联网,直接使用load_dataset进行加载
#ner_datasets = load_dataset("peoples_daily_ner", cache_dir="./data")
# 离线加载数据
from datasets import DatasetDict
ner_datasets = DatasetDict.load_from_disk("ner_data")
# 可以看到,数据集被成功加载,而且按照标准的训练集、验证集和测试集划分好了,无需我们再额外划分。
ner_datasets
DatasetDict({
train: Dataset({
features: ['id', 'tokens', 'ner_tags'],
num_rows: 20865
})
validation: Dataset({
features: ['id', 'tokens', 'ner_tags'],
num_rows: 2319
})
test: Dataset({
features: ['id', 'tokens', 'ner_tags'],
num_rows: 4637
})
})
# 可以看到,数据已经被划分为了token,存放在tokens字段中
# 标签存放在ner_tags字段中,但是这里的标签值是数值类型的,我们还需要进一步获取各个数值的含义。
print(ner_datasets["train"][0])
{
'id': '0',
'tokens': ['海', '钓', '比', '赛', '地', '点', '在', '厦', '门', '与', '金', '门', '之', '间', '的', '海', '域', '。'],
'ner_tags': [0, 0, 0, 0, 0, 0, 0, 5, 6, 0, 5, 6, 0, 0, 0, 0, 0, 0]
}
# features中包含着各个字段的信息
ner_datasets["train"].features
{
'id': Value(dtype='string', id=None),
'tokens': Sequence(feature=Value(dtype='string', id=None), length=-1, id=None),
'ner_tags': Sequence(feature=ClassLabel(names=['O', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC'], id=None), length=-1, id=None)
}
# 在ner_tags这一feature中存在着我们需要的信息,直接将该值提取出来即可
label_list = ner_datasets["train"].features["ner_tags"].feature.names
label_list
['O', 'B-PER', 'I-PER', 'B-ORG', 'I-ORG', 'B-LOC', 'I-LOC']
1.2.2 数据预处理
# 注意:需要魔法流量
# tokenizer = AutoTokenizer.from_pretrained("hfl/chinese-macbert-base")
model_path = '/root/autodl-fs/models/chinese-macbert-base'
tokenizer = AutoTokenizer.from_pretrained(model_path)
# 对于已经做好tokenize的数据,要指定is_split_into_words参数为True
tokenizer(ner_datasets["train"][0]["tokens"], is_split_into_words=True)
{
'input_ids': [101, 3862, 7157, 3683, 6612, 1765, 4157, 1762, 1336, 7305, 680, 7032, 7305, 722, 7313, 4638, 3862, 1818, 511, 102],
'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
}
# 对于labels,可以把word_ids中为None的值设置为-100
'''
举例如下:
【shang hai】原始的ner_tags = [5,6],5代表'B-LOC', 6代表'I-LOC'
假如经过分词后,【shang】被分为4个词,【hai】分为1个词,那么其word_ids为【None 0 0 0 0 1 None】
分词后input_ids【101 6 7 8 9 10 102】,特殊词元:101为cls、102为sep
那么其labels应该为【-100 5 5 5 5 6 -100】
'''
def process_function(examples):
tokenized_exmaples = tokenizer(examples["tokens"], max_length=128, truncation=True, is_split_into_words=True)
labels = []
for i, label in enumerate(examples["ner_tags"]):
word_ids = tokenized_exmaples.word_ids(batch_index=i)
label_ids = []
for word_id in word_ids:
if word_id is None:
label_ids.append(-100)
else:
label_ids.append(label[word_id])
labels.append(label_ids)
tokenized_exmaples["labels"] = labels
return tokenized_exmaples
# 定义完数据处理函数,便可以使用map方法,对数据集进行处理
# 不要忘了指定batched参数值为True,这样会加速数据处理。
tokenized_datasets = ner_datasets.map(process_function, batched=True)
tokenized_datasets
DatasetDict({
train: Dataset({
features: ['id', 'tokens', 'ner_tags', 'input_ids', 'token_type_ids', 'attention_mask', 'labels'],
num_rows: 20865
})
validation: Dataset({
features: ['id', 'tokens', 'ner_tags', 'input_ids', 'token_type_ids', 'attention_mask', 'labels'],
num_rows: 2319
})
test: Dataset({
features: ['id', 'tokens', 'ner_tags', 'input_ids', 'token_type_ids', 'attention_mask', 'labels'],
num_rows: 4637
})
})
print(tokenized_datasets["train"][0])
{
'id': '0',
'tokens': ['海', '钓', '比', '赛', '地', '点', '在', '厦', '门', '与', '金', '门', '之', '间', '的', '海', '域', '。'],
'ner_tags': [0, 0, 0, 0, 0, 0, 0, 5, 6, 0, 5, 6, 0, 0, 0, 0, 0, 0],
'input_ids': [101, 3862, 7157, 3683, 6612, 1765, 4157, 1762, 1336, 7305, 680, 7032, 7305, 722, 7313, 4638, 3862, 1818, 511, 102],
'token_type_ids': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
'labels': [-100, 0, 0, 0, 0, 0, 0, 0, 5, 6, 0, 5, 6, 0, 0, 0, 0, 0, 0, -100]
}
1.2.3 加载预训练模型
# 配置训练器之前,需要先加载模型,这里的模型需要和前面分词器的模型一致
# 这里除了要指定模型名称外,还要指定num_labels参数,值为label值的个数。
# model = AutoModelForTokenClassification.from_pretrained("hfl/chinese-macbert-base", num_labels=len(label_list))
model = AutoModelForTokenClassification.from_pretrained(model_path, num_labels=len(label_list))
model.config.num_labels # 7
1.2.4 创建评估函数
# pip install seqeval
# seqeval = evaluate.load("seqeval")
# 这里本地加载
seqeval = evaluate.load("seqeval_metric.py")
seqeval
EvaluationModule(name: "seqeval", module_type: "metric", features: {'predictions': Sequence(feature=Value(dtype='string', id='label'), length=-1, id='sequence'), 'references': Sequence(feature=Value(dtype='string', id='label'), length=-1, id='sequence')}, usage: """
Produces labelling scores along with its sufficient statistics
from a source against one or more references.
Args:
predictions: List of List of predicted labels (Estimated targets as returned by a tagger)
references: List of List of reference labels (Ground truth (correct) target values)
suffix: True if the IOB prefix is after type, False otherwise. default: False
scheme: Specify target tagging scheme. Should be one of ["IOB1", "IOB2", "IOE1", "IOE2", "IOBES", "BILOU"].
default: None
mode: Whether to count correct entity labels with incorrect I/B tags as true positives or not.
If you want to only count exact matches, pass mode="strict". default: None.
sample_weight: Array-like of shape (n_samples,), weights for individual samples. default: None
zero_division: Which value to substitute as a metric value when encountering zero division. Should be on of 0, 1,
"warn". "warn" acts as 0, but the warning is raised.
Returns:
'scores': dict. Summary of the scores for overall and per type
Overall:
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': F1 score, also known as balanced F-score or F-measure,
Per type:
'precision': precision,
'recall': recall,
'f1': F1 score, also known as balanced F-score or F-measure
Examples:
>>> predictions = [['O', 'O', 'B-MISC', 'I-MISC', 'I-MISC', 'I-MISC', 'O'], ['B-PER', 'I-PER', 'O']]
>>> references = [['O', 'O', 'O', 'B-MISC', 'I-MISC', 'I-MISC', 'O'], ['B-PER', 'I-PER', 'O']]
>>> seqeval = evaluate.load("seqeval")
>>> results = seqeval.compute(predictions=predictions, references=references)
>>> print(list(results.keys()))
['MISC', 'PER', 'overall_precision', 'overall_recall', 'overall_f1', 'overall_accuracy']
>>> print(results["overall_f1"])
0.5
>>> print(results["PER"]["f1"])
1.0
import numpy as np
# 该函数输入要求是BIO形式的标签序列,因此我们还需要将模型预测结果进行转换
# 根据真实标签不等于-100进行过滤即可
def eval_metric(pred):
predictions, labels = pred
predictions = np.argmax(predictions, axis=-1)
# 将id转换为原始的字符串类型的标签
true_predictions = [
[label_list[p] for p, l in zip(prediction, label) if l != -100]
for prediction, label in zip(predictions, labels)
]
true_labels = [
[label_list[l] for p, l in zip(prediction, label) if l != -100]
for prediction, label in zip(predictions, labels)
]
result = seqeval.compute(predictions=true_predictions, references=true_labels, mode="strict", scheme="IOB2")
return {
"f1": result["overall_f1"]
}
1.2.5 创建TrainingArguments及Trainer
'''
下面设置训练参数,
输出文件夹为models_for_ner,
配置学习率为2e-5,
训练时batch大小为64,
验证时为batch大小128,
评估策略为训练完一个epoch之后进行评估,
模型保存策略同上,
指定最优模型的评估指标为f1,
设置训练完成后加载最优模型,
权重衰减大小为0.01,
日志记录的步长为50,即50个batch记录一次,
训练轮数为3
'''
args = TrainingArguments(
learning_rate=2e-5,
output_dir="models_for_ner", # 输出文件夹
per_device_train_batch_size=16, # 训练时的batch_size
per_device_eval_batch_size=16, # 验证时的batch_size
evaluation_strategy="epoch", # 评估策略
save_strategy="epoch", # 保存策略
metric_for_best_model="f1", # 评估指标
load_best_model_at_end=True, # 训练完成后加载最优模型
weight_decay=0.01, # 权重衰减策略
logging_steps=50, # log 打印的频率
num_train_epochs=3 # 训练epochs的次数
)
'''
设置完训练参数,就可以构建训练器了。
第一个参数指定模型,
第二个参数指定训练参数,
接下来依次指定训练数据集与验证数据集,这里验证数据集使用了测试集,
而后指定评估函数,
最后指定data_collator的值为DataCollatorForTokenClassification的实例对象。
'''
trainer = Trainer(
model=model, # 预训练模型
args=args, # 训练参数
train_dataset=tokenized_datasets["train"], # 训练集
eval_dataset=tokenized_datasets["validation"], # 验证集
compute_metrics=eval_metric, # 指标评估的方法
data_collator=DataCollatorForTokenClassification(tokenizer=tokenizer) # DataCollator,填充到一个批次中最大长度,加快填充的速度
)
trainer.train()
1.2.6 模型预测
from transformers import pipeline
# 使用pipeline进行推理,要指定id2label
model.config.id2label = {idx: label for idx, label in enumerate(label_list)}
# 如果模型是基于GPU训练的,那么推理时要指定device
# 对于NER任务,可以指定aggregation_strategy为simple,得到具体的实体的结果,而不是token的结果
ner_pipe = pipeline("token-classification", model=model, tokenizer=tokenizer, device=0, aggregation_strategy="simple")
line = '近一年多的时间内,北京市与上海市同时通车了有轨电车项目,这对于现代有轨电车这种技术在中国的发展,正是一个非常大的激励。--央视李明报道'
res = ner_pipe(line)
res
[{'entity_group': 'LOC',
'score': 0.9993908,
'word': '北 京 市',
'start': 9,
'end': 12},
{'entity_group': 'LOC',
'score': 0.9993803,
'word': '上 海 市',
'start': 13,
'end': 16},
{'entity_group': 'LOC',
'score': 0.9997332,
'word': '中 国',
'start': 42,
'end': 44},
{'entity_group': 'ORG',
'score': 0.8498471,
'word': '央 视',
'start': 61,
'end': 63},
{'entity_group': 'PER',
'score': 0.985396,
'word': '李 明',
'start': 63,
'end': 65}]
# 根据start和end取实际的结果
ner_result = {}
for r in res:
if r["entity_group"] not in ner_result:
ner_result[r["entity_group"]] = []
ner_result[r["entity_group"]].append(line[r["start"]: r["end"]])
ner_result
{'LOC': ['北京市', '上海市', '中国'], 'ORG': ['央视'], 'PER': ['李明']}
2、多项选择
2.1 多项选择简介
2.1.1 多项选择介绍
多项选择 (Multiple Choice) 任务是机器阅读理解任务中的一种形式,相较机器阅读理解定义而言,多项选择任务还需要提供答案的候选项。
即给定一个或者多个文档P,以及一个问题Q和对应的多个答案候选,输出问题Q的答案A,A是答案候选中的某一个选项。
context(上下文):
女: 怎么样?买到票了吗?
男: 火车站好多人啊,我排了整整一天的队,等排到我了,他们说没票了,要等过了年才有。
女: 没关系,过了年回来也是一样的。
男: 公司初六就上班了,我怕过了年来不及。要不今年您和我爸来上海过年吧?
女: 我这老胳膊老腿的不想折腾了。
男: 一点儿不折腾,等我帮你们买好票,你们直接过来就行。
Question(问题):
男的遇到了什么困难?
Choices(选择):
A. 公司不放假
B. 过年东西贵
C. 没买到车票 ✔
D. 找不到车站
2.1.2 多项选择数据预处理
如下图,我们可以把每一个选项与上下文及问题进行组合。
这里用代码说一下具体的实现
from transformers import AutoTokenizer
import numpy as np
model_path = r'D:\python\models\chinese-macbert-base'
tokenizer = AutoTokenizer.from_pretrained(model_path)
def process_function(examples):
# examples, dict, keys: ["context", "quesiton", "choice", "answer"]
context = []
question_choice = []
labels = []
for idx in range(len(examples["context"])):
ctx = "\n".join(examples["context"][idx])
question = examples["question"][idx]
choices = examples["choice"][idx]
for choice in choices:
context.append(ctx)
question_choice.append(question + " " + choice)
# 如果选择不够4个,就进行补全
if len(choices) < 4:
for _ in range(4 - len(choices)):
context.append(ctx)
question_choice.append(question + " " + "不知道")
labels.append(choices.index(examples["answer"][idx]))
# 经过tokenizer后,input_ids的shape: 8 * 30
# truncation="only_first"表示仅仅截取context
tokenized_examples = tokenizer(context, question_choice, truncation="only_first", max_length=30, padding="max_length")
# 这里扩展一个维度:为num_choices
# 扩展后,shape变为:2 * 4 * 30
tokenized_examples = {k: [v[i: i + 4] for i in range(0, len(v), 4)] for k, v in tokenized_examples.items()}
tokenized_examples["labels"] = labels
return tokenized_examples
if __name__ == '__main__':
l = {
'id': [0, 1],
'context': [
['男:你今天晚上有时间吗?我们一起去看电影吧?', '女:你喜欢恐怖片和爱情片,但是我喜欢喜剧片,科幻片一般。所以……'],
['男:足球比赛是明天上午八点开始吧?', '女:因为天气不好,比赛改到后天下午三点了。']
],
'question': ['女的最喜欢哪种电影?', '根据对话,可以知道什么?'],
'choice': [
['恐怖片', '爱情片', '喜剧片', '科幻片'],
['今天天气不好', '比赛时间变了', '校长忘了时间']
],
'answer': ['喜剧片', '比赛时间变了']
}
print(np.asarray(process_function(l)['input_ids']).shape)
# (2, 4, 30)
然后,把[CLS]拿出做softmax,得到最终结果。
2.1.3 多项选择所用模型
多项选择任务用到的模型是AutoModelForMultipleChoice,我们看下BertForMultipleChoice源码
class BertForMultipleChoice(BertPreTrainedModel):
def __init__(self, config):
super().__init__(config)
self.bert = BertModel(config)
classifier_dropout = (
config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
)
self.dropout = nn.Dropout(classifier_dropout)
# 单项选择
self.classifier = nn.Linear(config.hidden_size, 1)
# Initialize weights and apply final processing
self.post_init()
@add_start_docstrings_to_model_forward(BERT_INPUTS_DOCSTRING.format("batch_size, num_choices, sequence_length"))
@add_code_sample_docstrings(
checkpoint=_CHECKPOINT_FOR_DOC,
output_type=MultipleChoiceModelOutput,
config_class=_CONFIG_FOR_DOC,
)
def forward(
self,
input_ids: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
token_type_ids: Optional[torch.Tensor] = None,
position_ids: Optional[torch.Tensor] = None,
head_mask: Optional[torch.Tensor] = None,
inputs_embeds: Optional[torch.Tensor] = None,
labels: Optional[torch.Tensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple[torch.Tensor], MultipleChoiceModelOutput]:
r"""
labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
Labels for computing the multiple choice classification loss. Indices should be in `[0, ...,
num_choices-1]` where `num_choices` is the size of the second dimension of the input tensors. (See
`input_ids` above)
"""
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# input_ids原始的shape为[batch, num_choices, seq_len]
# 多项选择的数量
num_choices = input_ids.shape[1] if input_ids is not None else inputs_embeds.shape[1]
# 将input_ids的shape改成[batch * num_choices, seq_len]
input_ids = input_ids.view(-1, input_ids.size(-1)) if input_ids is not None else None
attention_mask = attention_mask.view(-1, attention_mask.size(-1)) if attention_mask is not None else None
token_type_ids = token_type_ids.view(-1, token_type_ids.size(-1)) if token_type_ids is not None else None
position_ids = position_ids.view(-1, position_ids.size(-1)) if position_ids is not None else None
inputs_embeds = (
inputs_embeds.view(-1, inputs_embeds.size(-2), inputs_embeds.size(-1))
if inputs_embeds is not None
else None
)
# 经过Bert
outputs = self.bert(
input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
# outputs[1]的shape为 [batch * num_choices, hidden_dim]
# outputs[1]是pooled_output
# 注意:pooled_output就是将[CLS]这个token再过一下全连接层+Tanh激活函数,作为该句子的特征向量
# 多项选择--用【CLS】这个token的向量,输入到全连接层
pooled_output = outputs[1]
# shape为 [batch * num_choices, hidden_dim]
pooled_output = self.dropout(pooled_output)
# shape为 [batch * num_choices, 1]
logits = self.classifier(pooled_output)
# shape为 [batch , num_choices]
reshaped_logits = logits.view(-1, num_choices)
loss = None
if labels is not None:
loss_fct = CrossEntropyLoss()
loss = loss_fct(reshaped_logits, labels)
if not return_dict:
output = (reshaped_logits,) + outputs[2:]
return ((loss,) + output) if loss is not None else output
return MultipleChoiceModelOutput(
loss=loss,
logits=reshaped_logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
2.2 基于Transformers进行多项选择
模型仍然使用哈工大开源的chinese-macbert-base
这次使用的数据集是c3数据集,可以在huggingface官网中下载。
2.2.1 加载数据集
import evaluate
from datasets import DatasetDict
from transformers import AutoTokenizer, AutoModelForMultipleChoice, TrainingArguments, Trainer
import warnings
warnings.filterwarnings("ignore")
c3 = DatasetDict.load_from_disk("./c3/")
c3
DatasetDict({
test: Dataset({
features: ['id', 'context', 'question', 'choice', 'answer'],
num_rows: 1625
})
train: Dataset({
features: ['id', 'context', 'question', 'choice', 'answer'],
num_rows: 11869
})
validation: Dataset({
features: ['id', 'context', 'question', 'choice', 'answer'],
num_rows: 3816
})
})
c3["train"][:2]
{
'id': [0, 1],
'context': [
['男:你今天晚上有时间吗?我们一起去看电影吧?', '女:你喜欢恐怖片和爱情片,但是我喜欢喜剧片,科幻片一般。所以……'],
['男:足球比赛是明天上午八点开始吧?', '女:因为天气不好,比赛改到后天下午三点了。']
],
'question': ['女的最喜欢哪种电影?', '根据对话,可以知道什么?'],
'choice': [
['恐怖片', '爱情片', '喜剧片', '科幻片'],
['今天天气不好', '比赛时间变了', '校长忘了时间']
],
'answer': ['喜剧片', '比赛时间变了']
}
c3.pop("test")
c3
DatasetDict({
train: Dataset({
features: ['id', 'context', 'question', 'choice', 'answer'],
num_rows: 11869
})
validation: Dataset({
features: ['id', 'context', 'question', 'choice', 'answer'],
num_rows: 3816
})
})
2.2.2 数据预处理
model_path = '/root/autodl-fs/models/chinese-macbert-base'
tokenizer = AutoTokenizer.from_pretrained(model_path)
def process_function(examples):
# examples, dict, keys: ["context", "quesiton", "choice", "answer"]
# examples, 1000
context = []
question_choice = []
labels = []
for idx in range(len(examples["context"])):
ctx = "\n".join(examples["context"][idx])
question = examples["question"][idx]
choices = examples["choice"][idx]
for choice in choices:
context.append(ctx)
question_choice.append(question + " " + choice)
if len(choices) < 4:
for _ in range(4 - len(choices)):
context.append(ctx)
question_choice.append(question + " " + "不知道")
labels.append(choices.index(examples["answer"][idx]))
tokenized_examples = tokenizer(context, question_choice, truncation="only_first", max_length=256, padding="max_length") # input_ids: 4000 * 256,
tokenized_examples = {k: [v[i: i + 4] for i in range(0, len(v), 4)] for k, v in tokenized_examples.items()} # 1000 * 4 *256
tokenized_examples["labels"] = labels
return tokenized_examples
tokenized_c3 = c3.map(process_function, batched=True)
tokenized_c3
DatasetDict({
train: Dataset({
features: ['id', 'context', 'question', 'choice', 'answer', 'input_ids', 'token_type_ids', 'attention_mask', 'labels'],
num_rows: 11869
})
validation: Dataset({
features: ['id', 'context', 'question', 'choice', 'answer', 'input_ids', 'token_type_ids', 'attention_mask', 'labels'],
num_rows: 3816
})
})
2.2.3 加载模型、创建评估函数
import numpy as np
model = AutoModelForMultipleChoice.from_pretrained(model_path)
# 这里采用离线加载
accuracy_path = '/root/autodl-tmp/transformers-code/metrics/accuracy'
accuracy = evaluate.load(accuracy_path)
def compute_metric(pred):
predictions, labels = pred
predictions = np.argmax(predictions, axis=-1)
return accuracy.compute(predictions=predictions, references=labels)
2.2.4 创建TrainingArguments及Trainer
args = TrainingArguments(
output_dir="./muliple_choice",
per_device_train_batch_size=4,
per_gpu_eval_batch_size=4,
num_train_epochs=1,
logging_steps=50,
evaluation_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
fp16=True
)
trainer = Trainer(
model=model,
args=args,
train_dataset=tokenized_c3["train"],
eval_dataset=tokenized_c3["validation"],
compute_metrics=compute_metric
)
trainer.train()
2.2.5 模型预测
from typing import Any
import torch
# 自定义多项选择的Pipeline
class MultipleChoicePipeline:
def __init__(self, model, tokenizer) -> None:
self.model = model
self.tokenizer = tokenizer
self.device = model.device
def preprocess(self, context, quesiton, choices):
cs, qcs = [], []
for choice in choices:
cs.append(context)
qcs.append(quesiton + " " + choice)
return tokenizer(cs, qcs, truncation="only_first", max_length=256, return_tensors="pt")
def predict(self, inputs):
inputs = {k: v.unsqueeze(0).to(self.device) for k, v in inputs.items()}
return self.model(**inputs).logits
def postprocess(self, logits, choices):
predition = torch.argmax(logits, dim=-1).cpu().item()
return choices[predition]
def __call__(self, context, question, choices) -> Any:
# 前处理
inputs = self.preprocess(context, question, choices)
# 调用模型预测
logits = self.predict(inputs)
# 后处理
result = self.postprocess(logits, choices)
return result
pipe = MultipleChoicePipeline(model, tokenizer)
pipe("小明在北京上班", "小明在哪里上班?", ["北京", "上海", "河北", "海南", "河北", "海南"])
'北京'