☕️ 本文系列文章汇总:
(1)HMM开篇:基本概念和几个要素
(2)HMM计算问题:前后向算法
代码实现
(3)HMM学习问题:Baum-Welch算法
代码实现
(4) HMM预测问题:维特比算法本篇算法原理分析及公式推导请参考: HMM预测问题:维特比算法
目录
1. 模型参数估计
2. 维特比实现
3. 完整代码Github
4. 实例
事实上维特比算法属于隐马尔科夫模型的“应用篇”,特别是在NLP的分词领域,维特比算法无处不在。我们先需要根据HMM的学习算法来学习得到一个模型λ=(π,A,B),然后再通过这个模型,利用维特比算法对数据进行预测。本篇基于维特比算法实现一个简单的分词器,有助于大家深入理解。
1. 模型参数估计
我们先通过训练集来估计出一个模型。训练集是一堆已经分好词的文本,一行一条训练样本。在训练集中,我们的观测数据是每一个字,我们的状态是每一个字对应的分词标志,一共有4种状态:S,表示单字成词;B,表示一个分出来的词的起始字;M,表示一个分出来的词的中间字;E,表示一个分出来的词的结尾字。例如:
说|什么|难过|,|只不过|是|一次|错过
S|BE|BE|S|BME|S|BE|BE
注意,由于我们的训练集包含了事实上包含了观测值和状态值,因此我们不需要用无监督的Baum Welch算法来学习模型,只需要简单的有监督统计方法来估计模型参数即可,这个思想主要用到《统计学习方法》中10.3.1节中提到的方法。
class Model:
def __init__(self, trainfile, N, M, Q):
self.trainfile = trainfile
self.N = N
self.M = M
self.Pi = np.zeros(N)
self.A = np.zeros((N, N))
self.B = np.zeros((N, M))
self.Q2id = {x: i for i, x in enumerate(Q)}
def cal_rate(self):
reader = dataloader(self.trainfile)
for i, line in enumerate(reader):
line = line.strip().strip('\n')
if not line:
continue
word_list = line.split(' ')
status_sequence = []
# 计算π和B中每个元素的频数
for j, item in enumerate(word_list):
if len(item) == 1:
flag = 'S'
else:
flag = 'B' + 'M' * (len(item) - 2) + 'E'
if j == 0:
self.Pi[self.Q2id[flag[0]]] += 1
for t, s in enumerate(flag):
self.B[self.Q2id[s]][ord(item[t])] += 1
status_sequence.extend(flag)
# 计算A元素的频数
for t, s in enumerate(status_sequence):
prev = status_sequence[t - 1]
self.A[self.Q2id[prev]][self.Q2id[s]] += 1
def generate_model(self):
"""
根据课本10.3.1介绍的方法,将频数参数矩阵π、A、B转换成频率参数矩阵,
并对每一个元素取log,将后续的乘法运算转成加法运算,方便计算。
"""
self.cal_rate()
norm = -2.718e+16
denominator = sum(self.Pi)
# 处理π
for i, pi in enumerate(self.Pi):
if pi == 0.:
self.Pi[i] = norm
else:
self.Pi[i] = np.log(pi / denominator)
# 处理A
for row in range(self.A.shape[0]):
denominator = sum(self.A[row])
for col, a in enumerate(self.A[row]):
if a == 0.:
self.A[row][col] = norm
else:
self.A[row][col] = np.log(a / denominator)
# 处理B
for row in range(self.B.shape[0]):
denominator = sum(self.B[row])
for col, b in enumerate(self.B[row]):
if b == 0.:
self.B[row][col] = norm
else:
self.B[row][col] = np.log(b / denominator)
return AttrDict(
pi=self.Pi,
A=self.A,
B=self.B
)
2. 维特比实现
这一部分的代码完全是按照课本中算法流程【10.5】中的步骤来的,注意矩阵的运算正确即可。
class Viterbi:
def __init__(self, model: dict):
self.pi = model.pi
self.A = model.A
self.B = model.B
def predict(self, datapath):
"""
根据算法10.5中的流程计算δ和ψ
"""
reader = dataloader(datapath)
self.O = [line.strip().strip('\n') for line in reader]
N = self.pi.shape[0]
self.segs = []
for o in self.O:
o = [w for w in o if w]
if not o:
self.segs.append([])
continue
T = len(o)
delta_t = np.zeros((T, N))
psi_t = np.zeros((T, N))
for t in range(T):
if not t:
delta_t[t][:] = self.pi + self.B.T[:][ord(o[0])] # 由于log转换,所以原先的*变成+
psi_t[t][:] = np.zeros((1, N))
else:
deltaTemp = delta_t[t - 1] + self.A.T
for i in range(N):
delta_t[t][i] = max(deltaTemp[:][i]) + self.B[i][ord(o[t])]
psi_t[t][i] = np.argmax(deltaTemp[:][i])
I = []
"""
这里是回溯的过程,目的在于找最优路径,记得最后的路径需要反转,因为我们是从T时刻往前
找的。
"""
maxNode = np.argmax(delta_t[-1][:])
I.append(int(maxNode))
for t in range(T - 1, 0, -1):
maxNode = int(psi_t[t][maxNode])
I.append(maxNode)
I.reverse()
self.segs.append(I)
def segment(self):
"""
得到最优路径状态序列后,我们就可以根据状态序列对句子进行分割了
"""
segments = []
for i, line in enumerate(self.segs):
curText = ""
temp = []
for j, w in enumerate(line):
if w == 0:
temp.append(self.O[i][j])
else:
if w != 3:
curText += self.O[i][j]
else:
curText += self.O[i][j]
temp.append(curText)
curText = ''
segments.append(temp)
return segments
3. 完整代码Github
import numpy as np
class AttrDict(dict):
# 一个小trick,将结果返回成一个字典格式
def __init__(self, *args, **kwargs):
super(AttrDict, self).__init__(*args, **kwargs)
self.__dict__ = self
def dataloader(datapath):
with open(datapath, 'r') as reader:
for line in reader:
yield line
class Model:
def __init__(self, trainfile, N, M, Q):
self.trainfile = trainfile
self.N = N
self.M = M
self.Pi = np.zeros(N)
self.A = np.zeros((N, N))
self.B = np.zeros((N, M))
self.Q2id = {x: i for i, x in enumerate(Q)}
def cal_rate(self):
reader = dataloader(self.trainfile)
for i, line in enumerate(reader):
line = line.strip().strip('\n')
if not line:
continue
word_list = line.split(' ')
status_sequence = []
# 计算π和B中每个元素的频数
for j, item in enumerate(word_list):
if len(item) == 1:
flag = 'S'
else:
flag = 'B' + 'M' * (len(item) - 2) + 'E'
if j == 0:
self.Pi[self.Q2id[flag[0]]] += 1
for t, s in enumerate(flag):
self.B[self.Q2id[s]][ord(item[t])] += 1
status_sequence.extend(flag)
# 计算A元素的频数
for t, s in enumerate(status_sequence):
prev = status_sequence[t - 1]
self.A[self.Q2id[prev]][self.Q2id[s]] += 1
def generate_model(self):
self.cal_rate()
norm = -2.718e+16
denominator = sum(self.Pi)
for i, pi in enumerate(self.Pi):
if pi == 0.:
self.Pi[i] = norm
else:
self.Pi[i] = np.log(pi / denominator)
for row in range(self.A.shape[0]):
denominator = sum(self.A[row])
for col, a in enumerate(self.A[row]):
if a == 0.:
self.A[row][col] = norm
else:
self.A[row][col] = np.log(a / denominator)
for row in range(self.B.shape[0]):
denominator = sum(self.B[row])
for col, b in enumerate(self.B[row]):
if b == 0.:
self.B[row][col] = norm
else:
self.B[row][col] = np.log(b / denominator)
return AttrDict(
pi=self.Pi,
A=self.A,
B=self.B
)
class Viterbi:
def __init__(self, model: dict):
self.pi = model.pi
self.A = model.A
self.B = model.B
def predict(self, datapath):
reader = dataloader(datapath)
self.O = [line.strip().strip('\n') for line in reader]
N = self.pi.shape[0]
self.segs = []
for o in self.O:
o = [w for w in o if w]
if not o:
self.segs.append([])
continue
T = len(o)
delta_t = np.zeros((T, N))
psi_t = np.zeros((T, N))
for t in range(T):
if not t:
delta_t[t][:] = self.pi + self.B.T[:][ord(o[0])] # 由于log转换,所以原先的*变成+
psi_t[t][:] = np.zeros((1, N))
else:
deltaTemp = delta_t[t - 1] + self.A.T
for i in range(N):
delta_t[t][i] = max(deltaTemp[:][i]) + self.B[i][ord(o[t])]
psi_t[t][i] = np.argmax(deltaTemp[:][i])
I = []
maxNode = np.argmax(delta_t[-1][:])
I.append(int(maxNode))
for t in range(T - 1, 0, -1):
maxNode = int(psi_t[t][maxNode])
I.append(maxNode)
I.reverse()
self.segs.append(I)
def segment(self):
segments = []
for i, line in enumerate(self.segs):
curText = ""
temp = []
for j, w in enumerate(line):
if w == 0:
temp.append(self.O[i][j])
else:
if w != 3:
curText += self.O[i][j]
else:
curText += self.O[i][j]
temp.append(curText)
curText = ''
segments.append(temp)
return segments
4. 实例
if __name__ == '__main__':
# 因为我们的观测值是用编码来表示汉字的,这里设置观测值为65536就是为了能最大限度覆盖所有可能出
# 现的汉字。
trainer = Model(N=4, M=65536, Q=['S', 'B', 'M', 'E'], trainfile='train.txt')
model = trainer.generate_model()
segment = Viterbi(model)
segment.predict('test.txt')
print(segment.segment())
我们的训练集大概长这样:
给一条测试数据:
分词后:
[['他', '强调', ',', '党校', '始终', '不', '变', '的', '初心', '就', '是', '为', '党育', '才', '、', '为', '党', '献策', '。', '各级', '党校', '要', '坚守', '这个', '初心', ',锐', '意', '进', '取', '、', '奋发', '有', '为', ',', '为', '全', '面建', '设社', '会', '主义现', '代化国', '家', '、', '全面', '推进', '中华', '民族', '伟大', '复兴', '作', '出', '新', '的', '贡献', '。']]
可以看出,这是一般非常粗糙的分词器,虽然有些词分的不准,但是总体上还是可以的。由于我们的模型参数估计方法不是自发的学习过程,所以对于语料的依赖特别强,语料中没见过的词,就可能分错。