摘要:
记录昇思MindSpore AI框架使用LSTM+CRF模型分词标注的步骤和方法。包括环境准备、score计算、Normalizer计算、Viterbi算法、CRF组合,以及改进的双向LSTM+CRF模型。
一、概念
1.序列标注
标注标签输入序列中的每个Token
用于抽取文本信息
分词(Word Segmentation)
词性标注(Position Tagging)
命名实体识别(Named Entity Recognition, NER)
例如:
输入序列 | 清 | 华 | 大 | 学 | 座 | 落 | 于 | 首 | 都 | 北 | 京 |
输出标注 | B | I | I | I | O | O | O | O | O | B | I |
清华大学 和 北京是地名,标签后便于识别实体
“BIOE”标注法:实体(Entity)的开头标注为B,其他部分标注为I,非实体标注为O
2.条件随机场(Conditional Random Field, CRF)
标注序列
标签预测序列中每个Token,
简单的多分类问题
相邻Token直接有关联关系
输入序列 | 清 | 华 | 大 | 学 | |
输出标注 | B | I | I | I | √ |
输出标注 | O | I | I | I | × |
正确实体中的Token有依赖关系
I前必须是B或I
错误标注O违背了依赖
引入学习关联关系的算法----条件随机场概率图模型保证依赖正确性。
条件随机场
定义
参数化
序列标注问题的线性序列
选用条件随机场特指线性链条件随机场(Linear Chain CRF)
3.公式
x={x0,...,xn} 输入序列
y={y0,...,yn},y∈Y 输出标注序列
n 序列最大长度
Y x对应的所有可能的输出序列集合
输出序列y的概率为:
(1)
序列第i个Token
对应的标签
Score 捕获相邻标签和之间的关系
定义概率函数:
1. 发射概率函数ΨEMIT :表示 →的概率。
2. 转移概率函数ΨTRANS:表示→的概率。
Score计算公式:
T 标签集合
P 构造大小为|T|*|T|的矩阵
存储标签间转移概率
ℎ 编码层(Dense、LSTM等)输出的隐状态
发射概率
Score计算公式转化为:
实现CRF参数化形式
CRF层前向训练部分
CRF和损失函数合并
选择负对数似然函数(Negative Log Likelihood, NLL)
(4)
代入公式(1)
公式(5),被减数Normalizer,减数Score,分别实现后相减得Loss。
二、环境准备
%%capture captured_output
# 实验环境已经预装了mindspore==2.2.14,如需更换mindspore版本,可更改下面mindspore的版本号
!pip uninstall mindspore -y
!pip install -i https://pypi.mirrors.ustc.edu.cn/simple mindspore==2.2.14
# 查看当前 mindspore 版本
!pip show mindspore
输出:
Name: mindspore
Version: 2.2.14
Summary: MindSpore is a new open source deep learning training/inference framework that could be used for mobile, edge and cloud scenarios.
Home-page: https://www.mindspore.cn
Author: The MindSpore Authors
Author-email: contact@mindspore.cn
License: Apache 2.0
Location: /home/nginx/miniconda/envs/jupyter/lib/python3.9/site-packages
Requires: asttokens, astunparse, numpy, packaging, pillow, protobuf, psutil, scipy
Required-by:
三、Score计算
公式(3)计算正确标签序列所对应的得分,
转移概率矩阵P
两个大小为|T|的向量
序列开始时的转移概率
序列结束时的转移概率
mask 掩码矩阵忽略打包序列时的填充值
Score 仅计算有效Token
def compute_score(emissions, tags, seq_ends, mask, trans, start_trans, end_trans):
# emissions: (seq_length, batch_size, num_tags)
# tags: (seq_length, batch_size)
# mask: (seq_length, batch_size)
seq_length, batch_size = tags.shape
mask = mask.astype(emissions.dtype)
# 将score设置为初始转移概率
# shape: (batch_size,)
score = start_trans[tags[0]]
# score += 第一次发射概率
# shape: (batch_size,)
score += emissions[0, mnp.arange(batch_size), tags[0]]
for i in range(1, seq_length):
# 标签由i-1转移至i的转移概率(当mask == 1时有效)
# shape: (batch_size,)
score += trans[tags[i - 1], tags[i]] * mask[i]
# 预测tags[i]的发射概率(当mask == 1时有效)
# shape: (batch_size,)
score += emissions[i, mnp.arange(batch_size), tags[i]] * mask[i]
# 结束转移
# shape: (batch_size,)
last_tags = tags[seq_ends, mnp.arange(batch_size)]
# score += 结束转移概率
# shape: (batch_size,)
score += end_trans[last_tags]
return score
四、Normalizer计算
公式(5)中被减数Normalizer的计算
x对应的所有可能输出序列的Score的对数指数和(Log-Sum-Exp)
穷举法计算
计算每个可能的输出序列Score
共有|T|个结果
动态规划算法
复用计算结果提高效率
计算
先计算
Normalizer简化为:
第i个Token发射概率
P 转移矩阵。
和P与y序列计算无关,可提出得:
由公式(7),Normalizer的实现如下:
def compute_normalizer(emissions, mask, trans, start_trans, end_trans):
# emissions: (seq_length, batch_size, num_tags)
# mask: (seq_length, batch_size)
seq_length = emissions.shape[0]
# 将score设置为初始转移概率,并加上第一次发射概率
# shape: (batch_size, num_tags)
score = start_trans + emissions[0]
for i in range(1, seq_length):
# 扩展score的维度用于总score的计算
# shape: (batch_size, num_tags, 1)
broadcast_score = score.expand_dims(2)
# 扩展emission的维度用于总score的计算
# shape: (batch_size, 1, num_tags)
broadcast_emissions = emissions[i].expand_dims(1)
# 根据公式(7),计算score_i
# 此时broadcast_score是由第0个到当前Token所有可能路径
# 对应score的log_sum_exp
# shape: (batch_size, num_tags, num_tags)
next_score = broadcast_score + trans + broadcast_emissions
# 对score_i做log_sum_exp运算,用于下一个Token的score计算
# shape: (batch_size, num_tags)
next_score = ops.logsumexp(next_score, axis=1)
# 当mask == 1时,score才会变化
# shape: (batch_size, num_tags)
score = mnp.where(mask[i].expand_dims(1), next_score, score)
# 最后加结束转移概率
# shape: (batch_size, num_tags)
score += end_trans
# 对所有可能的路径得分求log_sum_exp
# shape: (batch_size,)
return ops.logsumexp(score, axis=1)
五、Viterbi算法
实现解码部分
选择适合求解序列最优路径的Viterbi算法
动态规划求解所有可能的预测序列得分
保存第i个Token对应的score取值最大的标签
Viterbi算法求解最优预测序列要用
最大概率得分Score
每个Token对应的标签历史History
根据Viterbi算法得到公式:
逆序求解每一个概率最大的标签,构成最佳的预测序列。
def viterbi_decode(emissions, mask, trans, start_trans, end_trans):
# emissions: (seq_length, batch_size, num_tags)
# mask: (seq_length, batch_size)
seq_length = mask.shape[0]
score = start_trans + emissions[0]
history = ()
for i in range(1, seq_length):
broadcast_score = score.expand_dims(2)
broadcast_emission = emissions[i].expand_dims(1)
next_score = broadcast_score + trans + broadcast_emission
# 求当前Token对应score取值最大的标签,并保存
indices = next_score.argmax(axis=1)
history += (indices,)
next_score = next_score.max(axis=1)
score = mnp.where(mask[i].expand_dims(1), next_score, score)
score += end_trans
return score, history
def post_decode(score, history, seq_length):
# 使用Score和History计算最佳预测序列
batch_size = seq_length.shape[0]
seq_ends = seq_length - 1
# shape: (batch_size,)
best_tags_list = []
# 依次对一个Batch中每个样例进行解码
for idx in range(batch_size):
# 查找使最后一个Token对应的预测概率最大的标签,
# 并将其添加至最佳预测序列存储的列表中
best_last_tag = score[idx].argmax(axis=0)
best_tags = [int(best_last_tag.asnumpy())]
# 重复查找每个Token对应的预测概率最大的标签,加入列表
for hist in reversed(history[:seq_ends[idx]]):
best_last_tag = hist[idx][best_tags[-1]]
best_tags.append(int(best_last_tag.asnumpy()))
# 将逆序求解的序列标签重置为正序
best_tags.reverse()
best_tags_list.append(best_tags)
return best_tags_list
六、CRF层
组装CRF层
输入序列可能存在Padding
输入序列的真实长度,seq_length参数
生成mask矩阵的sequence_mask方法。
nn.Cell封装
CRF层代码:
import mindspore as ms
import mindspore.nn as nn
import mindspore.ops as ops
import mindspore.numpy as mnp
from mindspore.common.initializer import initializer, Uniform
def sequence_mask(seq_length, max_length, batch_first=False):
"""根据序列实际长度和最大长度生成mask矩阵"""
range_vector = mnp.arange(0, max_length, 1, seq_length.dtype)
result = range_vector < seq_length.view(seq_length.shape + (1,))
if batch_first:
return result.astype(ms.int64)
return result.astype(ms.int64).swapaxes(0, 1)
class CRF(nn.Cell):
def __init__(self, num_tags: int, batch_first: bool = False, reduction: str = 'sum') -> None:
if num_tags <= 0:
raise ValueError(f'invalid number of tags: {num_tags}')
super().__init__()
if reduction not in ('none', 'sum', 'mean', 'token_mean'):
raise ValueError(f'invalid reduction: {reduction}')
self.num_tags = num_tags
self.batch_first = batch_first
self.reduction = reduction
self.start_transitions = ms.Parameter(initializer(Uniform(0.1), (num_tags,)), name='start_transitions')
self.end_transitions = ms.Parameter(initializer(Uniform(0.1), (num_tags,)), name='end_transitions')
self.transitions = ms.Parameter(initializer(Uniform(0.1), (num_tags, num_tags)), name='transitions')
def construct(self, emissions, tags=None, seq_length=None):
if tags is None:
return self._decode(emissions, seq_length)
return self._forward(emissions, tags, seq_length)
def _forward(self, emissions, tags=None, seq_length=None):
if self.batch_first:
batch_size, max_length = tags.shape
emissions = emissions.swapaxes(0, 1)
tags = tags.swapaxes(0, 1)
else:
max_length, batch_size = tags.shape
if seq_length is None:
seq_length = mnp.full((batch_size,), max_length, ms.int64)
mask = sequence_mask(seq_length, max_length)
# shape: (batch_size,)
numerator = compute_score(emissions, tags, seq_length-1, mask, self.transitions, self.start_transitions, self.end_transitions)
# shape: (batch_size,)
denominator = compute_normalizer(emissions, mask, self.transitions, self.start_transitions, self.end_transitions)
# shape: (batch_size,)
llh = denominator - numerator
if self.reduction == 'none':
return llh
if self.reduction == 'sum':
return llh.sum()
if self.reduction == 'mean':
return llh.mean()
return llh.sum() / mask.astype(emissions.dtype).sum()
def _decode(self, emissions, seq_length=None):
if self.batch_first:
batch_size, max_length = emissions.shape[:2]
emissions = emissions.swapaxes(0, 1)
else:
batch_size, max_length = emissions.shape[:2]
if seq_length is None:
seq_length = mnp.full((batch_size,), max_length, ms.int64)
mask = sequence_mask(seq_length, max_length)
return viterbi_decode(emissions, mask, self.transitions, self.start_transitions, self.end_transitions)
七、BiLSTM+CRF模型
双向LSTM+CRF模型训练命名实体识别任务。
模型结构如下:
nn.Embedding -> nn.LSTM -> nn.Dense -> CRF
LSTM 提取序列特征
Dense层 变换获得发射概率矩阵
CRF层
具体代码:
class BiLSTM_CRF(nn.Cell):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_tags, padding_idx=0):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=padding_idx)
self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2, bidirectional=True, batch_first=True)
self.hidden2tag = nn.Dense(hidden_dim, num_tags, 'he_uniform')
self.crf = CRF(num_tags, batch_first=True)
def construct(self, inputs, seq_length, tags=None):
embeds = self.embedding(inputs)
outputs, _ = self.lstm(embeds, seq_length=seq_length)
feats = self.hidden2tag(outputs)
crf_outs = self.crf(feats, tags, seq_length)
return crf_outs
生成两句示例和对应的标签
构造词表和标签表
embedding_dim = 16
hidden_dim = 32
training_data = [(
"清 华 大 学 坐 落 于 首 都 北 京".split(),
"B I I I O O O O O B I".split()
), (
"重 庆 是 一 个 魔 幻 城 市".split(),
"B I O O O O O O O".split()
)]
word_to_idx = {}
word_to_idx['<pad>'] = 0
for sentence, tags in training_data:
for word in sentence:
if word not in word_to_idx:
word_to_idx[word] = len(word_to_idx)
tag_to_idx = {"B": 0, "I": 1, "O": 2}
len(word_to_idx)
输出:
21
实例化模型
选择优化器
Wrapper封装模型和优化器
model = BiLSTM_CRF(len(word_to_idx), embedding_dim, hidden_dim, len(tag_to_idx))
optimizer = nn.SGD(model.trainable_params(), learning_rate=0.01, weight_decay=1e-4)
grad_fn = ms.value_and_grad(model, None, optimizer.parameters)
def train_step(data, seq_length, label):
loss, grads = grad_fn(data, seq_length, label)
optimizer(grads)
return loss
生成数据打包成Batch
按序列最大长度,填充长度不足的序列,
返回Tensor
输入序列
输出标签
序列长度
def prepare_sequence(seqs, word_to_idx, tag_to_idx):
seq_outputs, label_outputs, seq_length = [], [], []
max_len = max([len(i[0]) for i in seqs])
for seq, tag in seqs:
seq_length.append(len(seq))
idxs = [word_to_idx[w] for w in seq]
labels = [tag_to_idx[t] for t in tag]
idxs.extend([word_to_idx['<pad>'] for i in range(max_len - len(seq))])
labels.extend([tag_to_idx['O'] for i in range(max_len - len(seq))])
seq_outputs.append(idxs)
label_outputs.append(labels)
return ms.Tensor(seq_outputs, ms.int64), \
ms.Tensor(label_outputs, ms.int64), \
ms.Tensor(seq_length, ms.int64)
输出:
data, label, seq_length = prepare_sequence(training_data, word_to_idx, tag_to_idx)
data.shape, label.shape, seq_length.shape
((2, 11), (2, 11), (2,))
预编译模型
训练500step
训练流程可视化依赖tqdm库
安装pip install tqdm
from tqdm import tqdm
steps = 500
with tqdm(total=steps) as t:
for i in range(steps):
loss = train_step(data, seq_length, label)
t.set_postfix(loss=loss)
t.update(1)
输出:
0%| | 0/500 [00:00<?, ?it/s]
/
100%|██████████| 500/500 [04:33<00:00, 1.83it/s, loss=0.33540726]
score, history = model(data, seq_length)
score
\
输出:
Tensor(shape=[2, 3], dtype=Float32, value=
[[ 3.26808167e+01, 3.74181900e+01, 3.23572197e+01],
[ 2.94694691e+01, 2.79099541e+01, 3.44803162e+01]])
预测得分后处理
predict = post_decode(score, history, seq_length)
predict
输出:
[[0, 1, 1, 1, 2, 2, 2, 2, 2, 0, 1], [0, 1, 2, 2, 2, 2, 2, 2, 2]]
转换预测的index序列为标签序列
idx_to_tag = {idx: tag for tag, idx in tag_to_idx.items()}
def sequence_to_tag(sequences, idx_to_tag):
outputs = []
for seq in sequences:
outputs.append([idx_to_tag[i] for i in seq])
return outputs
sequence_to_tag(predict, idx_to_tag)
输出:
[['B', 'I', 'I', 'I', 'O', 'O', 'O', 'O', 'O', 'B', 'I'],
['B', 'I', 'O', 'O', 'O', 'O', 'O', 'O', 'O']]